You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/02/04 23:46:33 UTC
svn commit: r1442413 [2/2] - in /qpid/trunk/qpid/extras/nexus: ./
include/qpid/nexus/ src/ tests/
Modified: qpid/trunk/qpid/extras/nexus/src/message.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/message.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/message.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/message.c Mon Feb 4 22:46:32 2013
@@ -17,51 +17,14 @@
* under the License.
*/
-#include <qpid/nexus/message.h>
#include <qpid/nexus/ctools.h>
#include <qpid/nexus/threading.h>
+#include "message_private.h"
#include <string.h>
#include <stdio.h>
-
-//
-// Per-Thread allocator
-//
-typedef struct nx_allocator_t {
- nx_message_list_t message_free_list;
- nx_buffer_list_t buffer_free_list;
-} nx_allocator_t;
-
-//
-// Global allocator (protected by a global lock)
-//
-typedef struct {
- nx_message_list_t message_free_list;
- nx_buffer_list_t buffer_free_list;
- sys_mutex_t *lock;
-} nx_global_allocator_t;
-
-static nx_global_allocator_t global;
-static nx_allocator_config_t default_config;
-static const nx_allocator_config_t *config;
-
-
-static nx_allocator_t *nx_get_allocator(void)
-{
- static __thread nx_allocator_t *alloc = 0;
-
- if (!alloc) {
- alloc = NEW(nx_allocator_t);
-
- if (!alloc)
- return 0;
-
- DEQ_INIT(alloc->message_free_list);
- DEQ_INIT(alloc->buffer_free_list);
- }
-
- return alloc;
-}
+ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0);
+ALLOC_DEFINE(nx_message_content_t);
static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume)
@@ -288,7 +251,7 @@ static int nx_check_and_advance(nx_buffe
}
-static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
+static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len)
{
nx_buffer_t *buf = DEQ_TAIL(msg->buffers);
@@ -312,13 +275,13 @@ static void nx_insert(nx_message_t *msg,
}
-static void nx_insert_8(nx_message_t *msg, uint8_t value)
+static void nx_insert_8(nx_message_content_t *msg, uint8_t value)
{
nx_insert(msg, &value, 1);
}
-static void nx_insert_32(nx_message_t *msg, uint32_t value)
+static void nx_insert_32(nx_message_content_t *msg, uint32_t value)
{
uint8_t buf[4];
buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
@@ -329,7 +292,7 @@ static void nx_insert_32(nx_message_t *m
}
-static void nx_insert_64(nx_message_t *msg, uint64_t value)
+static void nx_insert_64(nx_message_content_t *msg, uint64_t value)
{
uint8_t buf[8];
buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
@@ -371,7 +334,7 @@ static void nx_overwrite_32(nx_field_loc
}
-static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
+static void nx_start_list_performative(nx_message_content_t *msg, uint8_t code)
{
//
// Insert the short-form performative tag
@@ -409,219 +372,116 @@ static void nx_start_list_performative(n
}
-static void nx_end_list(nx_message_t *msg)
+static void nx_end_list(nx_message_content_t *msg)
{
nx_overwrite_32(&msg->compose_length, msg->length);
nx_overwrite_32(&msg->compose_count, msg->count);
}
-const nx_allocator_config_t *nx_allocator_default_config(void)
+nx_message_t *nx_allocate_message()
{
- default_config.buffer_size = 1024;
- default_config.buffer_preallocation_count = 512;
- default_config.buffer_rebalancing_batch_count = 16;
- default_config.buffer_local_storage_max = 64;
- default_config.buffer_free_list_max = 1000000;
- default_config.message_allocation_batch_count = 256;
- default_config.message_rebalancing_batch_count = 64;
- default_config.message_local_storage_max = 256;
-
- return &default_config;
-}
-
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t();
+ if (!msg)
+ return 0;
-void nx_allocator_initialize(const nx_allocator_config_t *c)
-{
- config = c;
+ DEQ_ITEM_INIT(msg);
+ msg->content = new_nx_message_content_t();
+ msg->out_delivery = 0;
- // Initialize the fields in the global structure.
- DEQ_INIT(global.message_free_list);
- DEQ_INIT(global.buffer_free_list);
- global.lock = sys_mutex();
+ if (msg->content == 0) {
+ free_nx_message_t((nx_message_t*) msg);
+ return 0;
+ }
- // Pre-allocate buffers according to the configuration
- int i;
- nx_buffer_t *buf;
+ memset(msg->content, 0, sizeof(nx_message_content_t));
+ msg->content->lock = sys_mutex();
+ msg->content->ref_count = 1;
- for (i = 0; i < config->buffer_preallocation_count; i++) {
- buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
- DEQ_ITEM_INIT(buf);
- DEQ_INSERT_TAIL(global.buffer_free_list, buf);
- }
+ return (nx_message_t*) msg;
}
-void nx_allocator_finalize(void)
+void nx_free_message(nx_message_t *in_msg)
{
- // TODO - Free buffers and messages
-}
+ uint32_t rc;
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ sys_mutex_lock(content->lock);
+ rc = --content->ref_count;
+ sys_mutex_unlock(content->lock);
-nx_message_t *nx_allocate_message(void)
-{
- nx_allocator_t *alloc = nx_get_allocator();
- nx_message_t *msg;
- int i;
+ if (rc == 0) {
+ nx_buffer_t *buf = DEQ_HEAD(content->buffers);
- if (DEQ_SIZE(alloc->message_free_list) == 0) {
- //
- // The local free list is empty, rebalance a batch of objects from the global
- // free list.
- //
- sys_mutex_lock(global.lock);
- if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) {
- for (i = 0; i < config->message_rebalancing_batch_count; i++) {
- msg = DEQ_HEAD(global.message_free_list);
- DEQ_REMOVE_HEAD(global.message_free_list);
- DEQ_INSERT_TAIL(alloc->message_free_list, msg);
- }
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->buffers);
+ nx_free_buffer(buf);
+ buf = DEQ_HEAD(content->buffers);
}
- sys_mutex_unlock(global.lock);
- }
- if (DEQ_SIZE(alloc->message_free_list) == 0) {
- //
- // The local free list is still empty. This means there were not enough objects on the
- // global free list to make up a batch. Allocate new objects from the heap and store
- // them in the local free list.
- //
- nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count);
- memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count);
- for (i = 0; i < config->message_allocation_batch_count; i++) {
- DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]);
- }
+ sys_mutex_free(content->lock);
+ free_nx_message_content_t(content);
}
- //
- // If the local free list is still empty, we're out of memory.
- //
- if (DEQ_SIZE(alloc->message_free_list) == 0)
- return 0;
-
- msg = DEQ_HEAD(alloc->message_free_list);
- DEQ_REMOVE_HEAD(alloc->message_free_list);
-
- DEQ_INIT(msg->buffers);
- msg->in_delivery = NULL;
- msg->out_delivery = NULL;
- msg->section_message_header.buffer = 0;
- msg->section_message_header.parsed = 0;
- msg->section_delivery_annotation.buffer = 0;
- msg->section_delivery_annotation.parsed = 0;
- msg->section_message_annotation.buffer = 0;
- msg->section_message_annotation.parsed = 0;
- msg->section_message_properties.buffer = 0;
- msg->section_message_properties.parsed = 0;
- msg->section_application_properties.buffer = 0;
- msg->section_application_properties.parsed = 0;
- msg->section_body.buffer = 0;
- msg->section_body.parsed = 0;
- msg->section_footer.buffer = 0;
- msg->section_footer.parsed = 0;
- msg->field_user_id.buffer = 0;
- msg->field_user_id.parsed = 0;
- msg->field_to.buffer = 0;
- msg->field_to.parsed = 0;
- msg->body.buffer = 0;
- msg->body.parsed = 0;
- return msg;
+ free_nx_message_t((nx_message_t*) msg);
}
-nx_buffer_t *nx_allocate_buffer(void)
-{
- nx_allocator_t *alloc = nx_get_allocator();
- nx_buffer_t *buf;
- int i;
-
- if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
- sys_mutex_lock(global.lock);
- if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) {
- // Rebalance a batch of free descriptors to the local free list.
- for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
- buf = DEQ_HEAD(global.buffer_free_list);
- DEQ_REMOVE_HEAD(global.buffer_free_list);
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- }
- }
- sys_mutex_unlock(global.lock);
- }
-
- if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
- // Allocate a buffer from the heap
- buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
- DEQ_ITEM_INIT(buf);
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- }
+nx_message_t *nx_message_copy(nx_message_t *in_msg)
+{
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ nx_message_pvt_t *copy = (nx_message_pvt_t*) new_nx_message_t();
- if (DEQ_SIZE(alloc->buffer_free_list) == 0)
+ if (!copy)
return 0;
- buf = DEQ_HEAD(alloc->buffer_free_list);
- DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+ DEQ_ITEM_INIT(copy);
+ copy->content = content;
+ copy->out_delivery = 0;
- buf->size = 0;
+ sys_mutex_lock(content->lock);
+ content->ref_count++;
+ sys_mutex_unlock(content->lock);
- return buf;
+ return (nx_message_t*) copy;
}
-void nx_free_message(nx_message_t *msg)
+void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery)
{
- nx_allocator_t *alloc = nx_get_allocator();
+ ((nx_message_pvt_t*) msg)->out_delivery = delivery;
+}
- // Free any buffers in the message
- int i;
- nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
- while (buf) {
- DEQ_REMOVE_HEAD(msg->buffers);
- nx_free_buffer(buf);
- buf = DEQ_HEAD(msg->buffers);
- }
- DEQ_INSERT_TAIL(alloc->message_free_list, msg);
- if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) {
- //
- // The local free list has exceeded the threshold for local storage.
- // Rebalance a batch of free objects to the global free list.
- //
- sys_mutex_lock(global.lock);
- for (i = 0; i < config->message_rebalancing_batch_count; i++) {
- msg = DEQ_HEAD(alloc->message_free_list);
- DEQ_REMOVE_HEAD(alloc->message_free_list);
- DEQ_INSERT_TAIL(global.message_free_list, msg);
- }
- sys_mutex_unlock(global.lock);
- }
+pn_delivery_t *nx_message_out_delivery(nx_message_t *msg)
+{
+ return ((nx_message_pvt_t*) msg)->out_delivery;
}
-void nx_free_buffer(nx_buffer_t *buf)
+void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery)
{
- nx_allocator_t *alloc = nx_get_allocator();
- int i;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ content->in_delivery = delivery;
+}
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) {
- // Rebalance a batch of free descriptors to the global free list.
- sys_mutex_lock(global.lock);
- for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
- buf = DEQ_HEAD(alloc->buffer_free_list);
- DEQ_REMOVE_HEAD(alloc->buffer_free_list);
- DEQ_INSERT_TAIL(global.buffer_free_list, buf);
- }
- sys_mutex_unlock(global.lock);
- }
+
+pn_delivery_t *nx_message_in_delivery(nx_message_t *msg)
+{
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ return content->in_delivery;
}
nx_message_t *nx_message_receive(pn_delivery_t *delivery)
{
- pn_link_t *link = pn_delivery_link(delivery);
- nx_message_t *msg = (nx_message_t*) pn_delivery_get_context(delivery);
- ssize_t rc;
- nx_buffer_t *buf;
+ pn_link_t *link = pn_delivery_link(delivery);
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) pn_delivery_get_context(delivery);
+ ssize_t rc;
+ nx_buffer_t *buf;
//
// If there is no message associated with the delivery, this is the first time
@@ -629,15 +489,16 @@ nx_message_t *nx_message_receive(pn_deli
// link it and the delivery together.
//
if (!msg) {
- msg = nx_allocate_message();
+ msg = (nx_message_pvt_t*) nx_allocate_message();
pn_delivery_set_context(delivery, (void*) msg);
//
// Record the incoming delivery only if it is not settled. If it is
- // settled, there's no need to propagate disposition back to the sender.
+ // settled, it should not be recorded as no future operations on it are
+ // permitted.
//
if (!pn_delivery_settled(delivery))
- msg->in_delivery = delivery;
+ msg->content->in_delivery = delivery;
}
//
@@ -645,10 +506,10 @@ nx_message_t *nx_message_receive(pn_deli
// we will store incoming message data. If there is no buffer in the message, allocate
// an empty one and add it to the message.
//
- buf = DEQ_TAIL(msg->buffers);
+ buf = DEQ_TAIL(msg->content->buffers);
if (!buf) {
buf = nx_allocate_buffer();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
while (1) {
@@ -667,10 +528,10 @@ nx_message_t *nx_message_receive(pn_deli
// of the buffer size.
//
if (nx_buffer_size(buf) == 0) {
- DEQ_REMOVE_TAIL(msg->buffers);
+ DEQ_REMOVE_TAIL(msg->content->buffers);
nx_free_buffer(buf);
}
- return msg;
+ return (nx_message_t*) msg;
}
if (rc > 0) {
@@ -686,7 +547,7 @@ nx_message_t *nx_message_receive(pn_deli
//
if (nx_buffer_capacity(buf) == 0) {
buf = nx_allocate_buffer();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
} else
//
@@ -697,11 +558,24 @@ nx_message_t *nx_message_receive(pn_deli
break;
}
- return NULL;
+ return 0;
}
-int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
+void nx_message_send(nx_message_t *in_msg, pn_link_t *link)
+{
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+
+ // TODO - Handle cases where annotations have been added or modified
+ while (buf) {
+ pn_link_send(link, (char*) nx_buffer_base(buf), nx_buffer_size(buf));
+ buf = DEQ_NEXT(buf);
+ }
+}
+
+
+int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth)
{
#define LONG 10
@@ -712,8 +586,8 @@ int nx_message_check(nx_message_t *msg,
#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71"
#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72"
-#define MESSAGE_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
-#define MESSAGE_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
+#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74"
#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
@@ -726,8 +600,10 @@ int nx_message_check(nx_message_t *msg,
#define TAGS_MAP (unsigned char*) "\xc1\xd1"
#define TAGS_BINARY (unsigned char*) "\xa0\xb0"
- nx_buffer_t *buffer = DEQ_HEAD(msg->buffers);
- unsigned char *cursor;
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ nx_buffer_t *buffer = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
if (!buffer)
return 0; // Invalid - No data in the message
@@ -740,9 +616,9 @@ int nx_message_check(nx_message_t *msg,
//
// MESSAGE HEADER
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &msg->section_message_header))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header))
return 0;
if (depth == NX_DEPTH_HEADER)
@@ -751,9 +627,9 @@ int nx_message_check(nx_message_t *msg,
//
// DELIVERY ANNOTATION
//
- if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_delivery_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_delivery_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation))
return 0;
if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS)
@@ -762,31 +638,31 @@ int nx_message_check(nx_message_t *msg,
//
// MESSAGE ANNOTATION
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_message_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_message_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation))
return 0;
if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS)
return 1;
//
- // MESSAGE PROPERTIES
+ // PROPERTIES
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG, LONG, TAGS_LIST, &msg->section_message_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties))
return 0;
- if (depth == NX_DEPTH_MESSAGE_PROPERTIES)
+ if (depth == NX_DEPTH_PROPERTIES)
return 1;
//
// APPLICATION PROPERTIES
//
- if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &msg->section_application_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties))
return 0;
if (depth == NX_DEPTH_APPLICATION_PROPERTIES)
@@ -795,13 +671,13 @@ int nx_message_check(nx_message_t *msg,
//
// BODY (Note that this function expects a single data section or a single AMQP sequence)
//
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body))
return 0;
if (depth == NX_DEPTH_BODY)
@@ -810,67 +686,72 @@ int nx_message_check(nx_message_t *msg,
//
// FOOTER
//
- if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &msg->section_footer))
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer))
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer))
return 0;
return 1;
}
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg)
+nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field)
{
- while (1) {
- if (msg->field_to.parsed)
- return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL);
+ nx_message_content_t *content = MSG_CONTENT(msg);
- if (msg->section_message_properties.parsed == 0)
- break;
+ switch (field) {
+ case NX_FIELD_TO:
+ while (1) {
+ if (content->field_to.parsed)
+ return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL);
- nx_buffer_t *buffer = msg->section_message_properties.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset;
+ if (content->section_message_properties.parsed == 0)
+ break;
- int count = start_list(&cursor, &buffer);
- int result;
+ nx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset;
- if (count < 3)
- break;
+ int count = start_list(&cursor, &buffer);
+ int result;
- result = traverse_field(&cursor, &buffer, 0); // message_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, 0); // user_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, &msg->field_to); // to
- if (!result) return 0;
- }
+ if (count < 3)
+ break;
- return 0;
-}
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_to); // to
+ if (!result) return 0;
+ }
+ break;
+ case NX_FIELD_BODY:
+ while (1) {
+ if (content->body.parsed)
+ return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL);
-nx_field_iterator_t *nx_message_body(nx_message_t *msg)
-{
- while (1) {
- if (msg->body.parsed)
- return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL);
+ if (content->section_body.parsed == 0)
+ break;
- if (msg->section_body.parsed == 0)
- break;
+ nx_buffer_t *buffer = content->section_body.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset;
+ int result;
- nx_buffer_t *buffer = msg->section_body.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset;
- int result;
+ result = traverse_field(&cursor, &buffer, &content->body);
+ if (!result) return 0;
+ }
+ break;
- result = traverse_field(&cursor, &buffer, &msg->body);
- if (!result) return 0;
+ default:
+ break;
}
return 0;
}
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain)
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers)
{
nx_message_begin_header(msg);
nx_message_insert_boolean(msg, 0); // durable
@@ -896,20 +777,20 @@ void nx_message_compose_1(nx_message_t *
//nx_message_insert_null(msg); // reply-to-group-id
nx_message_end_message_properties(msg);
- if (buf_chain)
- nx_message_append_body_data(msg, buf_chain);
+ if (buffers)
+ nx_message_append_body_data(msg, buffers);
}
void nx_message_begin_header(nx_message_t *msg)
{
- nx_start_list_performative(msg, 0x70);
+ nx_start_list_performative(MSG_CONTENT(msg), 0x70);
}
void nx_message_end_header(nx_message_t *msg)
{
- nx_end_list(msg);
+ nx_end_list(MSG_CONTENT(msg));
}
@@ -939,13 +820,13 @@ void nx_message_end_message_annotations(
void nx_message_begin_message_properties(nx_message_t *msg)
{
- nx_start_list_performative(msg, 0x73);
+ nx_start_list_performative(MSG_CONTENT(msg), 0x73);
}
void nx_message_end_message_properties(nx_message_t *msg)
{
- nx_end_list(msg);
+ nx_end_list(MSG_CONTENT(msg));
}
@@ -961,34 +842,40 @@ void nx_message_end_application_properti
}
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain)
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers)
{
- uint32_t len = 0;
- nx_buffer_t *buf = buf_chain;
- nx_buffer_t *last = 0;
- size_t count = 0;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_buffer_t *buf = DEQ_HEAD(*buffers);
+ uint32_t len = 0;
+ //
+ // Calculate the size of the body to be appended.
+ //
while (buf) {
len += nx_buffer_size(buf);
- count++;
- last = buf;
buf = DEQ_NEXT(buf);
}
- nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3);
+ //
+ // Insert a DATA section performative header.
+ //
+ nx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
if (len < 256) {
- nx_insert_8(msg, 0xa0); // vbin8
- nx_insert_8(msg, (uint8_t) len);
+ nx_insert_8(content, 0xa0); // vbin8
+ nx_insert_8(content, (uint8_t) len);
} else {
- nx_insert_8(msg, 0xb0); // vbin32
- nx_insert_32(msg, len);
+ nx_insert_8(content, 0xb0); // vbin32
+ nx_insert_32(content, len);
}
- if (len > 0) {
- buf_chain->prev = msg->buffers.tail;
- msg->buffers.tail->next = buf_chain;
- msg->buffers.tail = last;
- msg->buffers.size += count;
+ //
+ // Move the supplied buffers to the tail of the message's buffer list.
+ //
+ buf = DEQ_HEAD(*buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*buffers);
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ buf = DEQ_HEAD(*buffers);
}
}
@@ -1017,148 +904,151 @@ void nx_message_end_footer(nx_message_t
void nx_message_insert_null(nx_message_t *msg)
{
- nx_insert_8(msg, 0x40);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x40);
+ content->count++;
}
void nx_message_insert_boolean(nx_message_t *msg, int value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value)
- nx_insert(msg, (const uint8_t*) "\x56\x01", 2);
+ nx_insert(content, (const uint8_t*) "\x56\x01", 2);
else
- nx_insert(msg, (const uint8_t*) "\x56\x00", 2);
- msg->count++;
+ nx_insert(content, (const uint8_t*) "\x56\x00", 2);
+ content->count++;
}
void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value)
{
- nx_insert_8(msg, 0x50);
- nx_insert_8(msg, value);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x50);
+ nx_insert_8(content, value);
+ content->count++;
}
void nx_message_insert_uint(nx_message_t *msg, uint32_t value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value == 0) {
- nx_insert_8(msg, 0x43); // uint0
+ nx_insert_8(content, 0x43); // uint0
} else if (value < 256) {
- nx_insert_8(msg, 0x52); // smalluint
- nx_insert_8(msg, (uint8_t) value);
+ nx_insert_8(content, 0x52); // smalluint
+ nx_insert_8(content, (uint8_t) value);
} else {
- nx_insert_8(msg, 0x70); // uint
- nx_insert_32(msg, value);
+ nx_insert_8(content, 0x70); // uint
+ nx_insert_32(content, value);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_ulong(nx_message_t *msg, uint64_t value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value == 0) {
- nx_insert_8(msg, 0x44); // ulong0
+ nx_insert_8(content, 0x44); // ulong0
} else if (value < 256) {
- nx_insert_8(msg, 0x53); // smallulong
- nx_insert_8(msg, (uint8_t) value);
+ nx_insert_8(content, 0x53); // smallulong
+ nx_insert_8(content, (uint8_t) value);
} else {
- nx_insert_8(msg, 0x80); // ulong
- nx_insert_64(msg, value);
+ nx_insert_8(content, 0x80); // ulong
+ nx_insert_64(content, value);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (len < 256) {
- nx_insert_8(msg, 0xa0); // vbin8
- nx_insert_8(msg, (uint8_t) len);
+ nx_insert_8(content, 0xa0); // vbin8
+ nx_insert_8(content, (uint8_t) len);
} else {
- nx_insert_8(msg, 0xb0); // vbin32
- nx_insert_32(msg, len);
+ nx_insert_8(content, 0xb0); // vbin32
+ nx_insert_32(content, len);
}
- nx_insert(msg, start, len);
- msg->count++;
+ nx_insert(content, start, len);
+ content->count++;
}
void nx_message_insert_string(nx_message_t *msg, const char *start)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
uint32_t len = strlen(start);
if (len < 256) {
- nx_insert_8(msg, 0xa1); // str8-utf8
- nx_insert_8(msg, (uint8_t) len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xa1); // str8-utf8
+ nx_insert_8(content, (uint8_t) len);
+ nx_insert(content, (const uint8_t*) start, len);
} else {
- nx_insert_8(msg, 0xb1); // str32-utf8
- nx_insert_32(msg, len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xb1); // str32-utf8
+ nx_insert_32(content, len);
+ nx_insert(content, (const uint8_t*) start, len);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value)
{
- nx_insert_8(msg, 0x98); // uuid
- nx_insert(msg, value, 16);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x98); // uuid
+ nx_insert(content, value, 16);
+ content->count++;
}
void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (len < 256) {
- nx_insert_8(msg, 0xa3); // sym8
- nx_insert_8(msg, (uint8_t) len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xa3); // sym8
+ nx_insert_8(content, (uint8_t) len);
+ nx_insert(content, (const uint8_t*) start, len);
} else {
- nx_insert_8(msg, 0xb3); // sym32
- nx_insert_32(msg, len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xb3); // sym32
+ nx_insert_32(content, len);
+ nx_insert(content, (const uint8_t*) start, len);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value)
{
- nx_insert_8(msg, 0x83); // timestamp
- nx_insert_64(msg, value);
- msg->count++;
-}
-
-
-unsigned char *nx_buffer_base(nx_buffer_t *buf)
-{
- return (unsigned char*) &buf[1];
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x83); // timestamp
+ nx_insert_64(content, value);
+ content->count++;
}
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
+void nx_message_begin_list(nx_message_t* msg)
{
- return ((unsigned char*) &buf[1]) + buf->size;
+ assert(0); // Not Implemented
}
-size_t nx_buffer_capacity(nx_buffer_t *buf)
+void nx_message_end_list(nx_message_t* msg)
{
- return config->buffer_size - buf->size;
+ assert(0); // Not Implemented
}
-size_t nx_buffer_size(nx_buffer_t *buf)
+void nx_message_begin_map(nx_message_t* msg)
{
- return buf->size;
+ assert(0); // Not Implemented
}
-void nx_buffer_insert(nx_buffer_t *buf, size_t len)
+void nx_message_end_map(nx_message_t* msg)
{
- buf->size += len;
- assert(buf->size <= config->buffer_size);
+ assert(0); // Not Implemented
}
Copied: qpid/trunk/qpid/extras/nexus/src/message_private.h (from r1434735, qpid/trunk/qpid/extras/nexus/src/server_private.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/message_private.h?p2=qpid/trunk/qpid/extras/nexus/src/message_private.h&p1=qpid/trunk/qpid/extras/nexus/src/server_private.h&r1=1434735&r2=1442413&rev=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/server_private.h (original)
+++ qpid/trunk/qpid/extras/nexus/src/message_private.h Mon Feb 4 22:46:32 2013
@@ -1,5 +1,5 @@
-#ifndef __server_private_h__
-#define __server_private_h__ 1
+#ifndef __message_private_h__
+#define __message_private_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,77 +19,76 @@
* under the License.
*/
-#include <qpid/nexus/server.h>
-#include <qpid/nexus/user_fd.h>
-#include <qpid/nexus/timer.h>
+#include <qpid/nexus/message.h>
#include <qpid/nexus/alloc.h>
-#include <proton/driver.h>
+#include <qpid/nexus/threading.h>
-void nx_server_timer_pending_LH(nx_timer_t *timer);
-void nx_server_timer_cancel_LH(nx_timer_t *timer);
+/**
+ * Architecture of the message module:
+ *
+ * +--------------+ +----------------------+
+ * | | | |
+ * | nx_message_t |----------->| nx_message_content_t |
+ * | | +----->| |
+ * +--------------+ | +----------------------+
+ * | |
+ * +--------------+ | | +-------------+ +-------------+ +-------------+
+ * | | | +--->| nx_buffer_t |-->| nx_buffer_t |-->| nx_buffer_t |--/
+ * | nx_message_t |-----+ +-------------+ +-------------+ +-------------+
+ * | |
+ * +--------------+
+ *
+ * The message module provides chained-fixed-sized-buffer storage of message content with multiple
+ * references. If a message is received and is to be queued for multiple destinations, there is only
+ * one copy of the message content in memory but multiple lightweight references to the content.
+ *
+ */
+typedef struct {
+ nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
+} nx_field_location_t;
+
+
+// TODO - consider using pointers to nx_field_location_t below to save memory
+// TODO - we need a second buffer list for modified annotations and header
+// There are three message scenarios:
+// 1) Received message is held and forwarded unmodified - single buffer list
+// 2) Received message is held and modified before forwarding - two buffer lists
+// 3) Message is composed internally - single buffer list
+
+typedef struct {
+ sys_mutex_t *lock;
+ uint32_t ref_count; // The number of qmessages referencing this
+ nx_buffer_list_t buffers; // The buffer chain containing the message
+ pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ nx_field_location_t section_message_header; // The message header list
+ nx_field_location_t section_delivery_annotation; // The delivery annotation map
+ nx_field_location_t section_message_annotation; // The message annotation map
+ nx_field_location_t section_message_properties; // The message properties list
+ nx_field_location_t section_application_properties; // The application properties list
+ nx_field_location_t section_body; // The message body: Data
+ nx_field_location_t section_footer; // The footer
+ nx_field_location_t field_user_id; // The string value of the user-id
+ nx_field_location_t field_to; // The string value of the to field
+ nx_field_location_t body; // The body of the message
+ nx_field_location_t compose_length;
+ nx_field_location_t compose_count;
+ uint32_t length;
+ uint32_t count;
+} nx_message_content_t;
+
+typedef struct {
+ DEQ_LINKS(nx_message_t); // Deq linkage that overlays the nx_message_t
+ nx_message_content_t *content;
+ pn_delivery_t *out_delivery;
+} nx_message_pvt_t;
-typedef enum {
- CONN_STATE_CONNECTING = 0,
- CONN_STATE_SASL_CLIENT,
- CONN_STATE_SASL_SERVER,
- CONN_STATE_OPENING,
- CONN_STATE_OPERATIONAL,
- CONN_STATE_FAILED,
- CONN_STATE_USER
-} conn_state_t;
-
-#define CONTEXT_NO_OWNER -1
-
-typedef enum {
- CXTR_STATE_CONNECTING = 0,
- CXTR_STATE_OPEN,
- CXTR_STATE_FAILED
-} cxtr_state_t;
-
-
-struct nx_listener_t {
- const nx_server_config_t *config;
- void *context;
- pn_listener_t *pn_listener;
-};
-
-
-struct nx_connector_t {
- cxtr_state_t state;
- const nx_server_config_t *config;
- void *context;
- nx_connection_t *ctx;
- nx_timer_t *timer;
- long delay;
-};
-
-
-struct nx_connection_t {
- conn_state_t state;
- int owner_thread;
- int enqueued;
- pn_connector_t *pn_cxtr;
- pn_connection_t *pn_conn;
- nx_listener_t *listener;
- nx_connector_t *connector;
- void *context; // Copy of context from listener or connector
- void *user_context;
- nx_user_fd_t *ufd;
-};
-
-
-struct nx_user_fd_t {
- void *context;
- int fd;
- pn_connector_t *pn_conn;
-};
-
-
-ALLOC_DECLARE(nx_listener_t);
-ALLOC_DECLARE(nx_connector_t);
-ALLOC_DECLARE(nx_connection_t);
-ALLOC_DECLARE(nx_user_fd_t);
+ALLOC_DECLARE(nx_message_t);
+ALLOC_DECLARE(nx_message_content_t);
+#define MSG_CONTENT(m) (((nx_message_pvt_t*) m)->content)
#endif
Modified: qpid/trunk/qpid/extras/nexus/src/timer.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/timer.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/timer.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/timer.c Mon Feb 4 22:46:32 2013
@@ -21,15 +21,17 @@
#include "server_private.h"
#include <qpid/nexus/ctools.h>
#include <qpid/nexus/threading.h>
+#include <qpid/nexus/alloc.h>
#include <assert.h>
#include <stdio.h>
static sys_mutex_t *lock;
-static nx_timer_list_t free_list;
static nx_timer_list_t idle_timers;
static nx_timer_list_t scheduled_timers;
static long time_base;
+ALLOC_DECLARE(nx_timer_t);
+ALLOC_DEFINE(nx_timer_t);
//=========================================================================
// Private static functions
@@ -67,27 +69,21 @@ static void nx_timer_cancel_LH(nx_timer_
nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context)
{
- nx_timer_t *timer;
+ nx_timer_t *timer = new_nx_timer_t();
+ if (!timer)
+ return 0;
+
+ DEQ_ITEM_INIT(timer);
+
+ timer->handler = cb;
+ timer->context = context;
+ timer->delta_time = 0;
+ timer->state = TIMER_IDLE;
sys_mutex_lock(lock);
-
- timer = DEQ_HEAD(free_list);
- if (timer) {
- DEQ_REMOVE_HEAD(free_list);
- } else {
- timer = NEW(nx_timer_t);
- DEQ_ITEM_INIT(timer);
- }
-
- if (timer) {
- timer->handler = cb;
- timer->context = context;
- timer->delta_time = 0;
- timer->state = TIMER_IDLE;
- DEQ_INSERT_TAIL(idle_timers, timer);
- }
-
+ DEQ_INSERT_TAIL(idle_timers, timer);
sys_mutex_unlock(lock);
+
return timer;
}
@@ -97,9 +93,10 @@ void nx_timer_free(nx_timer_t *timer)
sys_mutex_lock(lock);
nx_timer_cancel_LH(timer);
DEQ_REMOVE(idle_timers, timer);
- DEQ_INSERT_TAIL(free_list, timer);
- timer->state = TIMER_FREE;
sys_mutex_unlock(lock);
+
+ timer->state = TIMER_FREE;
+ free_nx_timer_t(timer);
}
@@ -180,7 +177,6 @@ void nx_timer_cancel(nx_timer_t *timer)
void nx_timer_initialize(sys_mutex_t *server_lock)
{
lock = server_lock;
- DEQ_INIT(free_list);
DEQ_INIT(idle_timers);
DEQ_INIT(scheduled_timers);
time_base = 0;
Modified: qpid/trunk/qpid/extras/nexus/tests/alloc_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/alloc_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/alloc_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/alloc_test.c Mon Feb 4 22:46:32 2013
@@ -30,7 +30,7 @@ typedef struct {
nx_alloc_config_t config = {3, 7, 10};
ALLOC_DECLARE(object_t);
-ALLOC_DEFINE_CONFIG(object_t, &config);
+ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config);
static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
Modified: qpid/trunk/qpid/extras/nexus/tests/message_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/message_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/message_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/message_test.c Mon Feb 4 22:46:32 2013
@@ -20,26 +20,18 @@
#include "test_case.h"
#include <stdio.h>
#include <string.h>
-#include <qpid/nexus/message.h>
+#include "message_private.h"
#include <qpid/nexus/iterator.h>
#include <proton/message.h>
-static char* test_init(void *context)
-{
- nx_allocator_initialize(nx_allocator_default_config());
- nx_allocator_finalize();
- return 0;
-}
-
-
static char* test_send_to_messenger(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
- nx_message_t *msg = nx_allocate_message();
nx_message_compose_1(msg, "test_addr_0", 0);
- nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
+ nx_buffer_t *buf = DEQ_HEAD(content->buffers);
if (buf == 0) return "Expected a buffer in the test message";
pn_message_t *pn_msg = pn_message();
@@ -52,15 +44,12 @@ static char* test_send_to_messenger(void
pn_message_free(pn_msg);
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
static char* test_receive_from_messenger(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
-
pn_message_t *pn_msg = pn_message();
pn_message_set_address(pn_msg, "test_addr_1");
@@ -70,12 +59,14 @@ static char* test_receive_from_messenger
if (result != 0) return "Error in pn_message_encode";
nx_buffer_insert(buf, size);
- nx_message_t *msg = nx_allocate_message();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
int valid = nx_message_check(msg, NX_DEPTH_ALL);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field_to(msg);
+ nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
if (iter == 0) return "Expected an iterator for the 'to' field";
if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
@@ -84,15 +75,12 @@ static char* test_receive_from_messenger
pn_message_free(pn_msg);
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
static char* test_insufficient_check_depth(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
-
pn_message_t *pn_msg = pn_message();
pn_message_set_address(pn_msg, "test_addr_2");
@@ -102,17 +90,18 @@ static char* test_insufficient_check_dep
if (result != 0) return "Error in pn_message_encode";
nx_buffer_insert(buf, size);
- nx_message_t *msg = nx_allocate_message();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field_to(msg);
+ nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
if (iter) return "Expected no iterator for the 'to' field";
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
@@ -121,7 +110,6 @@ int message_tests(void)
{
int result = 0;
- TEST_CASE(test_init, 0);
TEST_CASE(test_send_to_messenger, 0);
TEST_CASE(test_receive_from_messenger, 0);
TEST_CASE(test_insufficient_check_depth, 0);
Modified: qpid/trunk/qpid/extras/nexus/tests/timer_test.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/tests/timer_test.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/tests/timer_test.c (original)
+++ qpid/trunk/qpid/extras/nexus/tests/timer_test.c Mon Feb 4 22:46:32 2013
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <qpid/nexus/timer.h>
+#include "alloc_private.h"
#include "timer_private.h"
#include "test_case.h"
#include <qpid/nexus/threading.h>
@@ -341,6 +342,7 @@ static char* test_big(void *context)
int timer_tests(void)
{
int result = 0;
+ nx_alloc_initialize();
fire_mask = 0;
DEQ_INIT(pending_timers);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org