You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/02/04 23:46:33 UTC

svn commit: r1442413 [2/2] - in /qpid/trunk/qpid/extras/nexus: ./ include/qpid/nexus/ src/ tests/

Modified: qpid/trunk/qpid/extras/nexus/src/message.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/message.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/message.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/message.c Mon Feb  4 22:46:32 2013
@@ -17,51 +17,14 @@
  * under the License.
  */
 
-#include <qpid/nexus/message.h>
 #include <qpid/nexus/ctools.h>
 #include <qpid/nexus/threading.h>
+#include "message_private.h"
 #include <string.h>
 #include <stdio.h>
 
-
-//
-// Per-Thread allocator
-//
-typedef struct nx_allocator_t {
-    nx_message_list_t  message_free_list;
-    nx_buffer_list_t   buffer_free_list;
-} nx_allocator_t;
-
-//
-// Global allocator (protected by a global lock)
-//
-typedef struct {
-    nx_message_list_t  message_free_list;
-    nx_buffer_list_t   buffer_free_list;
-    sys_mutex_t       *lock;
-} nx_global_allocator_t;
-
-static nx_global_allocator_t  global;
-static nx_allocator_config_t  default_config;
-static const nx_allocator_config_t *config;
-
-
-static nx_allocator_t *nx_get_allocator(void)
-{
-    static __thread nx_allocator_t *alloc = 0;
-
-    if (!alloc) {
-        alloc = NEW(nx_allocator_t);
-
-        if (!alloc)
-            return 0;
-
-        DEQ_INIT(alloc->message_free_list);
-        DEQ_INIT(alloc->buffer_free_list);
-    }
-
-    return alloc;
-}
+ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0);
+ALLOC_DEFINE(nx_message_content_t);
 
 
 static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume)
@@ -288,7 +251,7 @@ static int nx_check_and_advance(nx_buffe
 }
 
 
-static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
+static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len)
 {
     nx_buffer_t *buf = DEQ_TAIL(msg->buffers);
 
@@ -312,13 +275,13 @@ static void nx_insert(nx_message_t *msg,
 }
 
 
-static void nx_insert_8(nx_message_t *msg, uint8_t value)
+static void nx_insert_8(nx_message_content_t *msg, uint8_t value)
 {
     nx_insert(msg, &value, 1);
 }
 
 
-static void nx_insert_32(nx_message_t *msg, uint32_t value)
+static void nx_insert_32(nx_message_content_t *msg, uint32_t value)
 {
     uint8_t buf[4];
     buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
@@ -329,7 +292,7 @@ static void nx_insert_32(nx_message_t *m
 }
 
 
-static void nx_insert_64(nx_message_t *msg, uint64_t value)
+static void nx_insert_64(nx_message_content_t *msg, uint64_t value)
 {
     uint8_t buf[8];
     buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
@@ -371,7 +334,7 @@ static void nx_overwrite_32(nx_field_loc
 }
 
 
-static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
+static void nx_start_list_performative(nx_message_content_t *msg, uint8_t code)
 {
     //
     // Insert the short-form performative tag
@@ -409,219 +372,116 @@ static void nx_start_list_performative(n
 }
 
 
-static void nx_end_list(nx_message_t *msg)
+static void nx_end_list(nx_message_content_t *msg)
 {
     nx_overwrite_32(&msg->compose_length, msg->length);
     nx_overwrite_32(&msg->compose_count,  msg->count);
 }
 
 
-const nx_allocator_config_t *nx_allocator_default_config(void)
+nx_message_t *nx_allocate_message()
 {
-    default_config.buffer_size                     = 1024;
-    default_config.buffer_preallocation_count      = 512;
-    default_config.buffer_rebalancing_batch_count  = 16;
-    default_config.buffer_local_storage_max        = 64;
-    default_config.buffer_free_list_max            = 1000000;
-    default_config.message_allocation_batch_count  = 256;
-    default_config.message_rebalancing_batch_count = 64;
-    default_config.message_local_storage_max       = 256;
-
-    return &default_config;
-}
-
+    nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t();
+    if (!msg)
+        return 0;
 
-void nx_allocator_initialize(const nx_allocator_config_t *c)
-{
-    config = c;
+    DEQ_ITEM_INIT(msg);
+    msg->content      = new_nx_message_content_t();
+    msg->out_delivery = 0;
 
-    // Initialize the fields in the global structure.
-    DEQ_INIT(global.message_free_list);
-    DEQ_INIT(global.buffer_free_list);
-    global.lock = sys_mutex();
+    if (msg->content == 0) {
+        free_nx_message_t((nx_message_t*) msg);
+        return 0;
+    }
 
-    // Pre-allocate buffers according to the configuration
-    int          i;
-    nx_buffer_t *buf;
+    memset(msg->content, 0, sizeof(nx_message_content_t));
+    msg->content->lock      = sys_mutex();
+    msg->content->ref_count = 1;
 
-    for (i = 0; i < config->buffer_preallocation_count; i++) {
-        buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
-        DEQ_ITEM_INIT(buf);
-        DEQ_INSERT_TAIL(global.buffer_free_list, buf);
-    }
+    return (nx_message_t*) msg;
 }
 
 
-void nx_allocator_finalize(void)
+void nx_free_message(nx_message_t *in_msg)
 {
-    // TODO - Free buffers and messages
-}
+    uint32_t rc;
+    nx_message_pvt_t     *msg     = (nx_message_pvt_t*) in_msg;
+    nx_message_content_t *content = msg->content;
 
+    sys_mutex_lock(content->lock);
+    rc = --content->ref_count;
+    sys_mutex_unlock(content->lock);
 
-nx_message_t *nx_allocate_message(void)
-{
-    nx_allocator_t *alloc = nx_get_allocator();
-    nx_message_t   *msg;
-    int             i;
+    if (rc == 0) {
+        nx_buffer_t *buf = DEQ_HEAD(content->buffers);
 
-    if (DEQ_SIZE(alloc->message_free_list) == 0) {
-        //
-        // The local free list is empty, rebalance a batch of objects from the global
-        // free list.
-        //
-        sys_mutex_lock(global.lock);
-        if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) {
-            for (i = 0; i < config->message_rebalancing_batch_count; i++) {
-                msg = DEQ_HEAD(global.message_free_list);
-                DEQ_REMOVE_HEAD(global.message_free_list);
-                DEQ_INSERT_TAIL(alloc->message_free_list, msg);
-            }
+        while (buf) {
+            DEQ_REMOVE_HEAD(content->buffers);
+            nx_free_buffer(buf);
+            buf = DEQ_HEAD(content->buffers);
         }
-        sys_mutex_unlock(global.lock);
-    }
 
-    if (DEQ_SIZE(alloc->message_free_list) == 0) {
-        //
-        // The local free list is still empty.  This means there were not enough objects on the
-        // global free list to make up a batch.  Allocate new objects from the heap and store
-        // them in the local free list.
-        //
-        nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count);
-        memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count);
-        for (i = 0; i < config->message_allocation_batch_count; i++) {
-            DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]);
-        }
+        sys_mutex_free(content->lock);
+        free_nx_message_content_t(content);
     }
 
-    //
-    // If the local free list is still empty, we're out of memory.
-    //
-    if (DEQ_SIZE(alloc->message_free_list) == 0)
-        return 0;
-
-    msg = DEQ_HEAD(alloc->message_free_list);
-    DEQ_REMOVE_HEAD(alloc->message_free_list);
-
-    DEQ_INIT(msg->buffers);
-    msg->in_delivery = NULL;
-    msg->out_delivery = NULL;
-    msg->section_message_header.buffer = 0;
-    msg->section_message_header.parsed = 0;
-    msg->section_delivery_annotation.buffer = 0;
-    msg->section_delivery_annotation.parsed = 0;
-    msg->section_message_annotation.buffer = 0;
-    msg->section_message_annotation.parsed = 0;
-    msg->section_message_properties.buffer = 0;
-    msg->section_message_properties.parsed = 0;
-    msg->section_application_properties.buffer = 0;
-    msg->section_application_properties.parsed = 0;
-    msg->section_body.buffer = 0;
-    msg->section_body.parsed = 0;
-    msg->section_footer.buffer = 0;
-    msg->section_footer.parsed = 0;
-    msg->field_user_id.buffer = 0;
-    msg->field_user_id.parsed = 0;
-    msg->field_to.buffer = 0;
-    msg->field_to.parsed = 0;
-    msg->body.buffer = 0;
-    msg->body.parsed = 0;
-    return msg;
+    free_nx_message_t((nx_message_t*) msg);
 }
 
 
-nx_buffer_t *nx_allocate_buffer(void)
-{
-    nx_allocator_t *alloc = nx_get_allocator();
-    nx_buffer_t    *buf;
-    int             i;
-
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
-        sys_mutex_lock(global.lock);
-        if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) {
-            // Rebalance a batch of free descriptors to the local free list.
-            for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
-                buf = DEQ_HEAD(global.buffer_free_list);
-                DEQ_REMOVE_HEAD(global.buffer_free_list);
-                DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-            }
-        }
-        sys_mutex_unlock(global.lock);
-    }
-
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
-        // Allocate a buffer from the heap
-        buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
-        DEQ_ITEM_INIT(buf);
-        DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-    }
+nx_message_t *nx_message_copy(nx_message_t *in_msg)
+{
+    nx_message_pvt_t     *msg     = (nx_message_pvt_t*) in_msg;
+    nx_message_content_t *content = msg->content;
+    nx_message_pvt_t     *copy    = (nx_message_pvt_t*) new_nx_message_t();
 
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0)
+    if (!copy)
         return 0;
 
-    buf = DEQ_HEAD(alloc->buffer_free_list);
-    DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+    DEQ_ITEM_INIT(copy);
+    copy->content      = content;
+    copy->out_delivery = 0;
 
-    buf->size = 0;
+    sys_mutex_lock(content->lock);
+    content->ref_count++;
+    sys_mutex_unlock(content->lock);
 
-    return buf;
+    return (nx_message_t*) copy;
 }
 
 
-void nx_free_message(nx_message_t *msg)
+void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery)
 {
-    nx_allocator_t *alloc = nx_get_allocator();
+    ((nx_message_pvt_t*) msg)->out_delivery = delivery;
+}
 
-    // Free any buffers in the message
-    int          i;
-    nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
-    while (buf) {
-        DEQ_REMOVE_HEAD(msg->buffers);
-        nx_free_buffer(buf);
-        buf = DEQ_HEAD(msg->buffers);
-    }
 
-    DEQ_INSERT_TAIL(alloc->message_free_list, msg);
-    if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) {
-        //
-        // The local free list has exceeded the threshold for local storage.
-        // Rebalance a batch of free objects to the global free list.
-        //
-        sys_mutex_lock(global.lock);
-        for (i = 0; i < config->message_rebalancing_batch_count; i++) {
-            msg = DEQ_HEAD(alloc->message_free_list);
-            DEQ_REMOVE_HEAD(alloc->message_free_list);
-            DEQ_INSERT_TAIL(global.message_free_list, msg);
-        }
-        sys_mutex_unlock(global.lock);
-    }
+pn_delivery_t *nx_message_out_delivery(nx_message_t *msg)
+{
+    return ((nx_message_pvt_t*) msg)->out_delivery;
 }
 
 
-void nx_free_buffer(nx_buffer_t *buf)
+void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery)
 {
-    nx_allocator_t *alloc = nx_get_allocator();
-    int             i;
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    content->in_delivery = delivery;
+}
 
-    DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-    if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) {
-        // Rebalance a batch of free descriptors to the global free list.
-        sys_mutex_lock(global.lock);
-        for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
-            buf = DEQ_HEAD(alloc->buffer_free_list);
-            DEQ_REMOVE_HEAD(alloc->buffer_free_list);
-            DEQ_INSERT_TAIL(global.buffer_free_list, buf);
-        }
-        sys_mutex_unlock(global.lock);
-    }
+
+pn_delivery_t *nx_message_in_delivery(nx_message_t *msg)
+{
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    return content->in_delivery;
 }
 
 
 nx_message_t *nx_message_receive(pn_delivery_t *delivery)
 {
-    pn_link_t    *link = pn_delivery_link(delivery);
-    nx_message_t *msg  = (nx_message_t*) pn_delivery_get_context(delivery);
-    ssize_t       rc;
-    nx_buffer_t  *buf;
+    pn_link_t        *link = pn_delivery_link(delivery);
+    nx_message_pvt_t *msg  = (nx_message_pvt_t*) pn_delivery_get_context(delivery);
+    ssize_t           rc;
+    nx_buffer_t      *buf;
 
     //
     // If there is no message associated with the delivery, this is the first time
@@ -629,15 +489,16 @@ nx_message_t *nx_message_receive(pn_deli
     // link it and the delivery together.
     //
     if (!msg) {
-        msg = nx_allocate_message();
+        msg = (nx_message_pvt_t*) nx_allocate_message();
         pn_delivery_set_context(delivery, (void*) msg);
 
         //
         // Record the incoming delivery only if it is not settled.  If it is 
-        // settled, there's no need to propagate disposition back to the sender.
+        // settled, it should not be recorded as no future operations on it are
+        // permitted.
         //
         if (!pn_delivery_settled(delivery))
-            msg->in_delivery = delivery;
+            msg->content->in_delivery = delivery;
     }
 
     //
@@ -645,10 +506,10 @@ nx_message_t *nx_message_receive(pn_deli
     // we will store incoming message data.  If there is no buffer in the message, allocate
     // an empty one and add it to the message.
     //
-    buf = DEQ_TAIL(msg->buffers);
+    buf = DEQ_TAIL(msg->content->buffers);
     if (!buf) {
         buf = nx_allocate_buffer();
-        DEQ_INSERT_TAIL(msg->buffers, buf);
+        DEQ_INSERT_TAIL(msg->content->buffers, buf);
     }
 
     while (1) {
@@ -667,10 +528,10 @@ nx_message_t *nx_message_receive(pn_deli
             // of the buffer size.
             //
             if (nx_buffer_size(buf) == 0) {
-                DEQ_REMOVE_TAIL(msg->buffers);
+                DEQ_REMOVE_TAIL(msg->content->buffers);
                 nx_free_buffer(buf);
             }
-            return msg;
+            return (nx_message_t*) msg;
         }
 
         if (rc > 0) {
@@ -686,7 +547,7 @@ nx_message_t *nx_message_receive(pn_deli
             //
             if (nx_buffer_capacity(buf) == 0) {
                 buf = nx_allocate_buffer();
-                DEQ_INSERT_TAIL(msg->buffers, buf);
+                DEQ_INSERT_TAIL(msg->content->buffers, buf);
             }
         } else
             //
@@ -697,11 +558,24 @@ nx_message_t *nx_message_receive(pn_deli
             break;
     }
 
-    return NULL;
+    return 0;
 }
 
 
-int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
+void nx_message_send(nx_message_t *in_msg, pn_link_t *link)
+{
+    nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+    nx_buffer_t      *buf = DEQ_HEAD(msg->content->buffers);
+
+    // TODO - Handle cases where annotations have been added or modified
+    while (buf) {
+        pn_link_send(link, (char*) nx_buffer_base(buf), nx_buffer_size(buf));
+        buf = DEQ_NEXT(buf);
+    }
+}
+
+
+int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth)
 {
 
 #define LONG  10
@@ -712,8 +586,8 @@ int nx_message_check(nx_message_t *msg, 
 #define DELIVERY_ANNOTATION_SHORT     (unsigned char*) "\x00\x53\x71"
 #define MESSAGE_ANNOTATION_LONG       (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
 #define MESSAGE_ANNOTATION_SHORT      (unsigned char*) "\x00\x53\x72"
-#define MESSAGE_PROPERTIES_LONG       (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
-#define MESSAGE_PROPERTIES_SHORT      (unsigned char*) "\x00\x53\x73"
+#define PROPERTIES_LONG               (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define PROPERTIES_SHORT              (unsigned char*) "\x00\x53\x73"
 #define APPLICATION_PROPERTIES_LONG   (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
 #define APPLICATION_PROPERTIES_SHORT  (unsigned char*) "\x00\x53\x74"
 #define BODY_DATA_LONG                (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
@@ -726,8 +600,10 @@ int nx_message_check(nx_message_t *msg, 
 #define TAGS_MAP                      (unsigned char*) "\xc1\xd1"
 #define TAGS_BINARY                   (unsigned char*) "\xa0\xb0"
 
-    nx_buffer_t   *buffer = DEQ_HEAD(msg->buffers);
-    unsigned char *cursor;
+    nx_message_pvt_t     *msg = (nx_message_pvt_t*) in_msg;
+    nx_message_content_t *content = msg->content;
+    nx_buffer_t          *buffer = DEQ_HEAD(content->buffers);
+    unsigned char        *cursor;
 
     if (!buffer)
         return 0; // Invalid - No data in the message
@@ -740,9 +616,9 @@ int nx_message_check(nx_message_t *msg, 
     //
     // MESSAGE HEADER
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG,  LONG,  TAGS_LIST, &msg->section_message_header))
+    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG,  LONG,  TAGS_LIST, &content->section_message_header))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header))
+    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header))
         return 0;
 
     if (depth == NX_DEPTH_HEADER)
@@ -751,9 +627,9 @@ int nx_message_check(nx_message_t *msg, 
     //
     // DELIVERY ANNOTATION
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &msg->section_delivery_annotation))
+    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &content->section_delivery_annotation))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &msg->section_delivery_annotation))
+    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &content->section_delivery_annotation))
         return 0;
 
     if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS)
@@ -762,31 +638,31 @@ int nx_message_check(nx_message_t *msg, 
     //
     // MESSAGE ANNOTATION
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &msg->section_message_annotation))
+    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &content->section_message_annotation))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &msg->section_message_annotation))
+    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &content->section_message_annotation))
         return 0;
 
     if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS)
         return 1;
 
     //
-    // MESSAGE PROPERTIES
+    // PROPERTIES
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG,  LONG,  TAGS_LIST, &msg->section_message_properties))
+    if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG,  LONG,  TAGS_LIST, &content->section_message_properties))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties))
+    if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties))
         return 0;
 
-    if (depth == NX_DEPTH_MESSAGE_PROPERTIES)
+    if (depth == NX_DEPTH_PROPERTIES)
         return 1;
 
     //
     // APPLICATION PROPERTIES
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG,  LONG,  TAGS_MAP, &msg->section_application_properties))
+    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG,  LONG,  TAGS_MAP, &content->section_application_properties))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties))
+    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties))
         return 0;
 
     if (depth == NX_DEPTH_APPLICATION_PROPERTIES)
@@ -795,13 +671,13 @@ int nx_message_check(nx_message_t *msg, 
     //
     // BODY  (Note that this function expects a single data section or a single AMQP sequence)
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG,      LONG,  TAGS_BINARY, &msg->section_body))
+    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG,      LONG,  TAGS_BINARY, &content->section_body))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT,     SHORT, TAGS_BINARY, &msg->section_body))
+    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT,     SHORT, TAGS_BINARY, &content->section_body))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG,  LONG,  TAGS_LIST,   &msg->section_body))
+    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG,  LONG,  TAGS_LIST,   &content->section_body))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST,   &msg->section_body))
+    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST,   &content->section_body))
         return 0;
 
     if (depth == NX_DEPTH_BODY)
@@ -810,67 +686,72 @@ int nx_message_check(nx_message_t *msg, 
     //
     // FOOTER
     //
-    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG,  LONG,  TAGS_MAP, &msg->section_footer))
+    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG,  LONG,  TAGS_MAP, &content->section_footer))
         return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer))
+    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer))
         return 0;
 
     return 1;
 }
 
 
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg)
+nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field)
 {
-    while (1) {
-        if (msg->field_to.parsed)
-            return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL);
+    nx_message_content_t *content = MSG_CONTENT(msg);
 
-        if (msg->section_message_properties.parsed == 0)
-            break;
+    switch (field) {
+    case NX_FIELD_TO:
+        while (1) {
+            if (content->field_to.parsed)
+                return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL);
 
-        nx_buffer_t   *buffer = msg->section_message_properties.buffer;
-        unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset;
+            if (content->section_message_properties.parsed == 0)
+                break;
 
-        int count = start_list(&cursor, &buffer);
-        int result;
+            nx_buffer_t   *buffer = content->section_message_properties.buffer;
+            unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset;
 
-        if (count < 3)
-            break;
+            int count = start_list(&cursor, &buffer);
+            int result;
 
-        result = traverse_field(&cursor, &buffer, 0); // message_id
-        if (!result) return 0;
-        result = traverse_field(&cursor, &buffer, 0); // user_id
-        if (!result) return 0;
-        result = traverse_field(&cursor, &buffer, &msg->field_to); // to
-        if (!result) return 0;
-    }
+            if (count < 3)
+                break;
 
-    return 0;
-}
+            result = traverse_field(&cursor, &buffer, 0); // message_id
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, 0); // user_id
+            if (!result) return 0;
+            result = traverse_field(&cursor, &buffer, &content->field_to); // to
+            if (!result) return 0;
+        }
+        break;
 
+    case NX_FIELD_BODY:
+        while (1) {
+            if (content->body.parsed)
+                return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL);
 
-nx_field_iterator_t *nx_message_body(nx_message_t *msg)
-{
-    while (1) {
-        if (msg->body.parsed)
-            return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL);
+            if (content->section_body.parsed == 0)
+                break;
 
-        if (msg->section_body.parsed == 0)
-            break;
+            nx_buffer_t   *buffer = content->section_body.buffer;
+            unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset;
+            int result;
 
-        nx_buffer_t   *buffer = msg->section_body.buffer;
-        unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset;
-        int result;
+            result = traverse_field(&cursor, &buffer, &content->body);
+            if (!result) return 0;
+        }
+        break;
 
-        result = traverse_field(&cursor, &buffer, &msg->body);
-        if (!result) return 0;
+    default:
+        break;
     }
 
     return 0;
 }
 
 
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain)
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers)
 {
     nx_message_begin_header(msg);
     nx_message_insert_boolean(msg, 0);  // durable
@@ -896,20 +777,20 @@ void nx_message_compose_1(nx_message_t *
     //nx_message_insert_null(msg);          // reply-to-group-id
     nx_message_end_message_properties(msg);
 
-    if (buf_chain)
-        nx_message_append_body_data(msg, buf_chain);
+    if (buffers)
+        nx_message_append_body_data(msg, buffers);
 }
 
 
 void nx_message_begin_header(nx_message_t *msg)
 {
-    nx_start_list_performative(msg, 0x70);
+    nx_start_list_performative(MSG_CONTENT(msg), 0x70);
 }
 
 
 void nx_message_end_header(nx_message_t *msg)
 {
-    nx_end_list(msg);
+    nx_end_list(MSG_CONTENT(msg));
 }
 
 
@@ -939,13 +820,13 @@ void nx_message_end_message_annotations(
 
 void nx_message_begin_message_properties(nx_message_t *msg)
 {
-    nx_start_list_performative(msg, 0x73);
+    nx_start_list_performative(MSG_CONTENT(msg), 0x73);
 }
 
 
 void nx_message_end_message_properties(nx_message_t *msg)
 {
-    nx_end_list(msg);
+    nx_end_list(MSG_CONTENT(msg));
 }
 
 
@@ -961,34 +842,40 @@ void nx_message_end_application_properti
 }
 
 
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain)
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers)
 {
-    uint32_t     len   = 0;
-    nx_buffer_t *buf   = buf_chain;
-    nx_buffer_t *last  = 0;
-    size_t       count = 0;
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    nx_buffer_t          *buf     = DEQ_HEAD(*buffers);
+    uint32_t              len     = 0;
 
+    //
+    // Calculate the size of the body to be appended.
+    //
     while (buf) {
         len += nx_buffer_size(buf);
-        count++;
-        last = buf;
         buf = DEQ_NEXT(buf);
     }
 
-    nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3);
+    //
+    // Insert a DATA section performative header.
+    //
+    nx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
     if (len < 256) {
-        nx_insert_8(msg, 0xa0);  // vbin8
-        nx_insert_8(msg, (uint8_t) len);
+        nx_insert_8(content, 0xa0);  // vbin8
+        nx_insert_8(content, (uint8_t) len);
     } else {
-        nx_insert_8(msg, 0xb0);  // vbin32
-        nx_insert_32(msg, len);
+        nx_insert_8(content, 0xb0);  // vbin32
+        nx_insert_32(content, len);
     }
 
-    if (len > 0) {
-        buf_chain->prev         = msg->buffers.tail;
-        msg->buffers.tail->next = buf_chain;
-        msg->buffers.tail       = last;
-        msg->buffers.size      += count;
+    //
+    // Move the supplied buffers to the tail of the message's buffer list.
+    //
+    buf = DEQ_HEAD(*buffers);
+    while (buf) {
+        DEQ_REMOVE_HEAD(*buffers);
+        DEQ_INSERT_TAIL(content->buffers, buf);
+        buf = DEQ_HEAD(*buffers);
     }
 }
 
@@ -1017,148 +904,151 @@ void nx_message_end_footer(nx_message_t 
 
 void nx_message_insert_null(nx_message_t *msg)
 {
-    nx_insert_8(msg, 0x40);
-    msg->count++;
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    nx_insert_8(content, 0x40);
+    content->count++;
 }
 
 
 void nx_message_insert_boolean(nx_message_t *msg, int value)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     if (value)
-        nx_insert(msg, (const uint8_t*) "\x56\x01", 2);
+        nx_insert(content, (const uint8_t*) "\x56\x01", 2);
     else
-        nx_insert(msg, (const uint8_t*) "\x56\x00", 2);
-    msg->count++;
+        nx_insert(content, (const uint8_t*) "\x56\x00", 2);
+    content->count++;
 }
 
 
 void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value)
 {
-    nx_insert_8(msg, 0x50);
-    nx_insert_8(msg, value);
-    msg->count++;
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    nx_insert_8(content, 0x50);
+    nx_insert_8(content, value);
+    content->count++;
 }
 
 
 void nx_message_insert_uint(nx_message_t *msg, uint32_t value)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     if (value == 0) {
-        nx_insert_8(msg, 0x43);  // uint0
+        nx_insert_8(content, 0x43);  // uint0
     } else if (value < 256) {
-        nx_insert_8(msg, 0x52);  // smalluint
-        nx_insert_8(msg, (uint8_t) value);
+        nx_insert_8(content, 0x52);  // smalluint
+        nx_insert_8(content, (uint8_t) value);
     } else {
-        nx_insert_8(msg, 0x70);  // uint
-        nx_insert_32(msg, value);
+        nx_insert_8(content, 0x70);  // uint
+        nx_insert_32(content, value);
     }
-    msg->count++;
+    content->count++;
 }
 
 
 void nx_message_insert_ulong(nx_message_t *msg, uint64_t value)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     if (value == 0) {
-        nx_insert_8(msg, 0x44);  // ulong0
+        nx_insert_8(content, 0x44);  // ulong0
     } else if (value < 256) {
-        nx_insert_8(msg, 0x53);  // smallulong
-        nx_insert_8(msg, (uint8_t) value);
+        nx_insert_8(content, 0x53);  // smallulong
+        nx_insert_8(content, (uint8_t) value);
     } else {
-        nx_insert_8(msg, 0x80);  // ulong
-        nx_insert_64(msg, value);
+        nx_insert_8(content, 0x80);  // ulong
+        nx_insert_64(content, value);
     }
-    msg->count++;
+    content->count++;
 }
 
 
 void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     if (len < 256) {
-        nx_insert_8(msg, 0xa0);  // vbin8
-        nx_insert_8(msg, (uint8_t) len);
+        nx_insert_8(content, 0xa0);  // vbin8
+        nx_insert_8(content, (uint8_t) len);
     } else {
-        nx_insert_8(msg, 0xb0);  // vbin32
-        nx_insert_32(msg, len);
+        nx_insert_8(content, 0xb0);  // vbin32
+        nx_insert_32(content, len);
     }
-    nx_insert(msg, start, len);
-    msg->count++;
+    nx_insert(content, start, len);
+    content->count++;
 }
 
 
 void nx_message_insert_string(nx_message_t *msg, const char *start)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     uint32_t len = strlen(start);
 
     if (len < 256) {
-        nx_insert_8(msg, 0xa1);  // str8-utf8
-        nx_insert_8(msg, (uint8_t) len);
-        nx_insert(msg, (const uint8_t*) start, len);
+        nx_insert_8(content, 0xa1);  // str8-utf8
+        nx_insert_8(content, (uint8_t) len);
+        nx_insert(content, (const uint8_t*) start, len);
     } else {
-        nx_insert_8(msg, 0xb1);  // str32-utf8
-        nx_insert_32(msg, len);
-        nx_insert(msg, (const uint8_t*) start, len);
+        nx_insert_8(content, 0xb1);  // str32-utf8
+        nx_insert_32(content, len);
+        nx_insert(content, (const uint8_t*) start, len);
     }
-    msg->count++;
+    content->count++;
 }
 
 
 void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value)
 {
-    nx_insert_8(msg, 0x98);  // uuid
-    nx_insert(msg, value, 16);
-    msg->count++;
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    nx_insert_8(content, 0x98);  // uuid
+    nx_insert(content, value, 16);
+    content->count++;
 }
 
 
 void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len)
 {
+    nx_message_content_t *content = MSG_CONTENT(msg);
     if (len < 256) {
-        nx_insert_8(msg, 0xa3);  // sym8
-        nx_insert_8(msg, (uint8_t) len);
-        nx_insert(msg, (const uint8_t*) start, len);
+        nx_insert_8(content, 0xa3);  // sym8
+        nx_insert_8(content, (uint8_t) len);
+        nx_insert(content, (const uint8_t*) start, len);
     } else {
-        nx_insert_8(msg, 0xb3);  // sym32
-        nx_insert_32(msg, len);
-        nx_insert(msg, (const uint8_t*) start, len);
+        nx_insert_8(content, 0xb3);  // sym32
+        nx_insert_32(content, len);
+        nx_insert(content, (const uint8_t*) start, len);
     }
-    msg->count++;
+    content->count++;
 }
 
 
 void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value)
 {
-    nx_insert_8(msg, 0x83);  // timestamp
-    nx_insert_64(msg, value);
-    msg->count++;
-}
-
-
-unsigned char *nx_buffer_base(nx_buffer_t *buf)
-{
-    return (unsigned char*) &buf[1];
+    nx_message_content_t *content = MSG_CONTENT(msg);
+    nx_insert_8(content, 0x83);  // timestamp
+    nx_insert_64(content, value);
+    content->count++;
 }
 
 
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
+void nx_message_begin_list(nx_message_t* msg)
 {
-    return ((unsigned char*) &buf[1]) + buf->size;
+    assert(0); // Not Implemented
 }
 
 
-size_t nx_buffer_capacity(nx_buffer_t *buf)
+void nx_message_end_list(nx_message_t* msg)
 {
-    return config->buffer_size - buf->size;
+    assert(0); // Not Implemented
 }
 
 
-size_t nx_buffer_size(nx_buffer_t *buf)
+void nx_message_begin_map(nx_message_t* msg)
 {
-    return buf->size;
+    assert(0); // Not Implemented
 }
 
 
-void nx_buffer_insert(nx_buffer_t *buf, size_t len)
+void nx_message_end_map(nx_message_t* msg)
 {
-    buf->size += len;
-    assert(buf->size <= config->buffer_size);
+    assert(0); // Not Implemented
 }
 

Copied: qpid/trunk/qpid/extras/nexus/src/message_private.h (from r1434735, qpid/trunk/qpid/extras/nexus/src/server_private.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/message_private.h?p2=qpid/trunk/qpid/extras/nexus/src/message_private.h&p1=qpid/trunk/qpid/extras/nexus/src/server_private.h&r1=1434735&r2=1442413&rev=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/server_private.h (original)
+++ qpid/trunk/qpid/extras/nexus/src/message_private.h Mon Feb  4 22:46:32 2013
@@ -1,5 +1,5 @@
-#ifndef __server_private_h__
-#define __server_private_h__ 1
+#ifndef __message_private_h__
+#define __message_private_h__ 1
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,77 +19,76 @@
  * under the License.
  */
 
-#include <qpid/nexus/server.h>
-#include <qpid/nexus/user_fd.h>
-#include <qpid/nexus/timer.h>
+#include <qpid/nexus/message.h>
 #include <qpid/nexus/alloc.h>
-#include <proton/driver.h>
+#include <qpid/nexus/threading.h>
 
-void nx_server_timer_pending_LH(nx_timer_t *timer);
-void nx_server_timer_cancel_LH(nx_timer_t *timer);
+/**
+ * Architecture of the message module:
+ *
+ *     +--------------+            +----------------------+
+ *     |              |            |                      |
+ *     | nx_message_t |----------->| nx_message_content_t |
+ *     |              |     +----->|                      |
+ *     +--------------+     |      +----------------------+
+ *                          |                |
+ *     +--------------+     |                |    +-------------+   +-------------+   +-------------+
+ *     |              |     |                +--->| nx_buffer_t |-->| nx_buffer_t |-->| nx_buffer_t |--/
+ *     | nx_message_t |-----+                     +-------------+   +-------------+   +-------------+
+ *     |              |
+ *     +--------------+
+ *
+ * The message module provides chained-fixed-sized-buffer storage of message content with multiple
+ * references.  If a message is received and is to be queued for multiple destinations, there is only
+ * one copy of the message content in memory but multiple lightweight references to the content.
+ *
+ */
 
+typedef struct {
+    nx_buffer_t *buffer;  // Buffer that contains the first octet of the field, null if the field is not present
+    size_t       offset;  // Offset in the buffer to the first octet
+    size_t       length;  // Length of the field or zero if unneeded
+    int          parsed;  // non-zero iff the buffer chain has been parsed to find this field
+} nx_field_location_t;
+
+
+// TODO - consider using pointers to nx_field_location_t below to save memory
+// TODO - we need a second buffer list for modified annotations and header
+//        There are three message scenarios:
+//            1) Received message is held and forwarded unmodified - single buffer list
+//            2) Received message is held and modified before forwarding - two buffer lists
+//            3) Message is composed internally - single buffer list
+
+typedef struct {
+    sys_mutex_t         *lock;
+    uint32_t             ref_count;                       // The number of qmessages referencing this
+    nx_buffer_list_t     buffers;                         // The buffer chain containing the message
+    pn_delivery_t       *in_delivery;                     // The delivery on which the message arrived
+    nx_field_location_t  section_message_header;          // The message header list
+    nx_field_location_t  section_delivery_annotation;     // The delivery annotation map
+    nx_field_location_t  section_message_annotation;      // The message annotation map
+    nx_field_location_t  section_message_properties;      // The message properties list
+    nx_field_location_t  section_application_properties;  // The application properties list
+    nx_field_location_t  section_body;                    // The message body: Data
+    nx_field_location_t  section_footer;                  // The footer
+    nx_field_location_t  field_user_id;                   // The string value of the user-id
+    nx_field_location_t  field_to;                        // The string value of the to field
+    nx_field_location_t  body;                            // The body of the message
+    nx_field_location_t  compose_length;
+    nx_field_location_t  compose_count;
+    uint32_t             length;
+    uint32_t             count;
+} nx_message_content_t;
+
+typedef struct {
+    DEQ_LINKS(nx_message_t);                              // Deq linkage that overlays the nx_message_t
+    nx_message_content_t *content;
+    pn_delivery_t        *out_delivery;
+} nx_message_pvt_t;
 
-typedef enum {
-    CONN_STATE_CONNECTING = 0,
-    CONN_STATE_SASL_CLIENT,
-    CONN_STATE_SASL_SERVER,
-    CONN_STATE_OPENING,
-    CONN_STATE_OPERATIONAL,
-    CONN_STATE_FAILED,
-    CONN_STATE_USER
-} conn_state_t;
-
-#define CONTEXT_NO_OWNER -1
-
-typedef enum {
-    CXTR_STATE_CONNECTING = 0,
-    CXTR_STATE_OPEN,
-    CXTR_STATE_FAILED
-} cxtr_state_t;
-
-
-struct nx_listener_t {
-    const nx_server_config_t *config;
-    void                     *context;
-    pn_listener_t            *pn_listener;
-};
-
-
-struct nx_connector_t {
-    cxtr_state_t              state;
-    const nx_server_config_t *config;
-    void                     *context;
-    nx_connection_t          *ctx;
-    nx_timer_t               *timer;
-    long                      delay;
-};
-
-
-struct nx_connection_t {
-    conn_state_t     state;
-    int              owner_thread;
-    int              enqueued;
-    pn_connector_t  *pn_cxtr;
-    pn_connection_t *pn_conn;
-    nx_listener_t   *listener;
-    nx_connector_t  *connector;
-    void            *context; // Copy of context from listener or connector
-    void            *user_context;
-    nx_user_fd_t    *ufd;
-};
-
-
-struct nx_user_fd_t {
-    void           *context;
-    int             fd;
-    pn_connector_t *pn_conn;
-};
-
-
-ALLOC_DECLARE(nx_listener_t);
-ALLOC_DECLARE(nx_connector_t);
-ALLOC_DECLARE(nx_connection_t);
-ALLOC_DECLARE(nx_user_fd_t);
+ALLOC_DECLARE(nx_message_t);
+ALLOC_DECLARE(nx_message_content_t);
 
+#define MSG_CONTENT(m) (((nx_message_pvt_t*) m)->content)
 
 #endif

Modified: qpid/trunk/qpid/extras/nexus/src/timer.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/timer.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/timer.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/timer.c Mon Feb  4 22:46:32 2013
@@ -21,15 +21,17 @@
 #include "server_private.h"
 #include <qpid/nexus/ctools.h>
 #include <qpid/nexus/threading.h>
+#include <qpid/nexus/alloc.h>
 #include <assert.h>
 #include <stdio.h>
 
 static sys_mutex_t     *lock;
-static nx_timer_list_t  free_list;
 static nx_timer_list_t  idle_timers;
 static nx_timer_list_t  scheduled_timers;
 static long             time_base;
 
+ALLOC_DECLARE(nx_timer_t);
+ALLOC_DEFINE(nx_timer_t);
 
 //=========================================================================
 // Private static functions
@@ -67,27 +69,21 @@ static void nx_timer_cancel_LH(nx_timer_
 
 nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context)
 {
-    nx_timer_t *timer;
+    nx_timer_t *timer = new_nx_timer_t();
+    if (!timer)
+        return 0;
+
+    DEQ_ITEM_INIT(timer);
+
+    timer->handler    = cb;
+    timer->context    = context;
+    timer->delta_time = 0;
+    timer->state      = TIMER_IDLE;
 
     sys_mutex_lock(lock);
-
-    timer = DEQ_HEAD(free_list);
-    if (timer) {
-        DEQ_REMOVE_HEAD(free_list);
-    } else {
-        timer = NEW(nx_timer_t);
-        DEQ_ITEM_INIT(timer);
-    }
-
-    if (timer) {
-        timer->handler    = cb;
-        timer->context    = context;
-        timer->delta_time = 0;
-        timer->state      = TIMER_IDLE;
-        DEQ_INSERT_TAIL(idle_timers, timer);
-    }
-
+    DEQ_INSERT_TAIL(idle_timers, timer);
     sys_mutex_unlock(lock);
+
     return timer;
 }
 
@@ -97,9 +93,10 @@ void nx_timer_free(nx_timer_t *timer)
     sys_mutex_lock(lock);
     nx_timer_cancel_LH(timer);
     DEQ_REMOVE(idle_timers, timer);
-    DEQ_INSERT_TAIL(free_list, timer);
-    timer->state = TIMER_FREE;
     sys_mutex_unlock(lock);
+
+    timer->state = TIMER_FREE;
+    free_nx_timer_t(timer);
 }
 
 
@@ -180,7 +177,6 @@ void nx_timer_cancel(nx_timer_t *timer)
 void nx_timer_initialize(sys_mutex_t *server_lock)
 {
     lock = server_lock;
-    DEQ_INIT(free_list);
     DEQ_INIT(idle_timers);
     DEQ_INIT(scheduled_timers);
     time_base = 0;

Modified: qpid/trunk/qpid/extras/nexus/tests/alloc_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/alloc_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/alloc_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/alloc_test.c Mon Feb  4 22:46:32 2013
@@ -30,7 +30,7 @@ typedef struct {
 nx_alloc_config_t config = {3, 7, 10};
 
 ALLOC_DECLARE(object_t);
-ALLOC_DEFINE_CONFIG(object_t, &config);
+ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config);
 
 
 static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)

Modified: qpid/trunk/qpid/extras/nexus/tests/message_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/message_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/message_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/message_test.c Mon Feb  4 22:46:32 2013
@@ -20,26 +20,18 @@
 #include "test_case.h"
 #include <stdio.h>
 #include <string.h>
-#include <qpid/nexus/message.h>
+#include "message_private.h"
 #include <qpid/nexus/iterator.h>
 #include <proton/message.h>
 
 
-static char* test_init(void *context)
-{
-    nx_allocator_initialize(nx_allocator_default_config());
-    nx_allocator_finalize();
-    return 0;
-}
-
-
 static char* test_send_to_messenger(void *context)
 {
-    nx_allocator_initialize(nx_allocator_default_config());
+    nx_message_t         *msg     = nx_allocate_message();
+    nx_message_content_t *content = MSG_CONTENT(msg);
 
-    nx_message_t *msg = nx_allocate_message();
     nx_message_compose_1(msg, "test_addr_0", 0);
-    nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
+    nx_buffer_t *buf = DEQ_HEAD(content->buffers);
     if (buf == 0) return "Expected a buffer in the test message";
 
     pn_message_t *pn_msg = pn_message();
@@ -52,15 +44,12 @@ static char* test_send_to_messenger(void
     pn_message_free(pn_msg);
     nx_free_message(msg);
 
-    nx_allocator_finalize();
     return 0;
 }
 
 
 static char* test_receive_from_messenger(void *context)
 {
-    nx_allocator_initialize(nx_allocator_default_config());
-
     pn_message_t *pn_msg = pn_message();
     pn_message_set_address(pn_msg, "test_addr_1");
 
@@ -70,12 +59,14 @@ static char* test_receive_from_messenger
     if (result != 0) return "Error in pn_message_encode";
     nx_buffer_insert(buf, size);
 
-    nx_message_t *msg = nx_allocate_message();
-    DEQ_INSERT_TAIL(msg->buffers, buf);
+    nx_message_t         *msg     = nx_allocate_message();
+    nx_message_content_t *content = MSG_CONTENT(msg);
+
+    DEQ_INSERT_TAIL(content->buffers, buf);
     int valid = nx_message_check(msg, NX_DEPTH_ALL);
     if (!valid) return "nx_message_check returns 'invalid'";
 
-    nx_field_iterator_t *iter = nx_message_field_to(msg);
+    nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
     if (iter == 0) return "Expected an iterator for the 'to' field";
 
     if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
@@ -84,15 +75,12 @@ static char* test_receive_from_messenger
     pn_message_free(pn_msg);
     nx_free_message(msg);
 
-    nx_allocator_finalize();
     return 0;
 }
 
 
 static char* test_insufficient_check_depth(void *context)
 {
-    nx_allocator_initialize(nx_allocator_default_config());
-
     pn_message_t *pn_msg = pn_message();
     pn_message_set_address(pn_msg, "test_addr_2");
 
@@ -102,17 +90,18 @@ static char* test_insufficient_check_dep
     if (result != 0) return "Error in pn_message_encode";
     nx_buffer_insert(buf, size);
 
-    nx_message_t *msg = nx_allocate_message();
-    DEQ_INSERT_TAIL(msg->buffers, buf);
+    nx_message_t         *msg     = nx_allocate_message();
+    nx_message_content_t *content = MSG_CONTENT(msg);
+
+    DEQ_INSERT_TAIL(content->buffers, buf);
     int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS);
     if (!valid) return "nx_message_check returns 'invalid'";
 
-    nx_field_iterator_t *iter = nx_message_field_to(msg);
+    nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
     if (iter) return "Expected no iterator for the 'to' field";
 
     nx_free_message(msg);
 
-    nx_allocator_finalize();
     return 0;
 }
 
@@ -121,7 +110,6 @@ int message_tests(void)
 {
     int result = 0;
 
-    TEST_CASE(test_init, 0);
     TEST_CASE(test_send_to_messenger, 0);
     TEST_CASE(test_receive_from_messenger, 0);
     TEST_CASE(test_insufficient_check_depth, 0);

Modified: qpid/trunk/qpid/extras/nexus/tests/timer_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/timer_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/timer_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/timer_test.c Mon Feb  4 22:46:32 2013
@@ -19,6 +19,7 @@
 
 #include <stdio.h>
 #include <qpid/nexus/timer.h>
+#include "alloc_private.h"
 #include "timer_private.h"
 #include "test_case.h"
 #include <qpid/nexus/threading.h>
@@ -341,6 +342,7 @@ static char* test_big(void *context)
 int timer_tests(void)
 {
     int result = 0;
+    nx_alloc_initialize();
 
     fire_mask = 0;
     DEQ_INIT(pending_timers);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org