You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2015/03/12 20:32:16 UTC
svn commit: r1666279 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/
tests/
Author: kgiusti
Date: Thu Mar 12 19:32:16 2015
New Revision: 1666279
URL: http://svn.apache.org/r1666279
Log:
DISPATCH-98: Refactor message annotation handling
Allows the various annotations to be set on a per-message basis.
Added:
qpid/dispatch/trunk/tests/buffer_test.c (with props)
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/buffer.h
qpid/dispatch/trunk/include/qpid/dispatch/compose.h
qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
qpid/dispatch/trunk/include/qpid/dispatch/message.h
qpid/dispatch/trunk/src/buffer.c
qpid/dispatch/trunk/src/compose.c
qpid/dispatch/trunk/src/message.c
qpid/dispatch/trunk/src/message_private.h
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/tests/CMakeLists.txt
qpid/dispatch/trunk/tests/compose_test.c
qpid/dispatch/trunk/tests/message_test.c
qpid/dispatch/trunk/tests/run_unit_tests_size.c
Modified: qpid/dispatch/trunk/include/qpid/dispatch/buffer.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/buffer.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/buffer.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/buffer.h Thu Mar 12 19:32:16 2015
@@ -92,6 +92,32 @@ size_t qd_buffer_size(qd_buffer_t *buf);
*/
void qd_buffer_insert(qd_buffer_t *buf, size_t len);
+/**
+ * Create a new buffer list by cloning an existing one.
+ *
+ * @param dst A pointer to a list to contain the new buffers
+ * @param src A pointer to an existing buffer list
+ * @return the number of bytes of data in the new chain
+ */
+unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const qd_buffer_list_t *src);
+
+/**
+ * Free all the buffers contained in a buffer list
+ *
+ * @param list A pointer to a list containing buffers. On return this list
+ * will be set to an empty list.
+ */
+void qd_buffer_list_free_buffers(qd_buffer_list_t *list);
+
+/**
+ * Return the total number of data bytes in a buffer list
+ *
+ * @param list A pointer to a list containing buffers.
+ * @return total number of bytes of data in the buffer list
+ */
+unsigned int qd_buffer_list_length(const qd_buffer_list_t *list);
+
+
///@}
#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/compose.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/compose.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/compose.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/compose.h Thu Mar 12 19:32:16 2015
@@ -209,6 +209,33 @@ void qd_compose_insert_symbol(qd_compose
*/
void qd_compose_insert_typed_iterator(qd_composed_field_t *field, qd_field_iterator_t *iter);
+/**
+ * Begin composing a new sub field that can be appended to a composed field.
+ *
+ * @param extend An existing field onto which to append the new field or NULL to
+ * create a standalone field.
+ * @return A pointer to the newly created field.
+ */
+qd_composed_field_t *qd_compose_subfield(qd_composed_field_t *extend);
+
+/**
+ * Steal the underlying buffers away from a composed field.
+ *
+ * @param field A composed field whose buffers will be taken
+ * @param list To hold the extracted buffers
+ */
+void qd_compose_take_buffers(qd_composed_field_t *field,
+ qd_buffer_list_t *list);
+/**
+ * Append a buffer list into a composed field. If field is a container type
+ * (list, map), the container's count will be incremented by one.
+ *
+ * @param field A field created by ::qd_compose().
+ * @param list A list of buffers containing a single completely encoded data
+ * object. Ownership of these buffers is given to field.
+ */
+void qd_compose_insert_buffers(qd_composed_field_t *field, qd_buffer_list_t *list);
+
///@}
#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/ctools.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/ctools.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/ctools.h Thu Mar 12 19:32:16 2015
@@ -45,6 +45,7 @@
#define DEQ_EMPTY {0,0,0,0}
#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
+#define DEQ_IS_EMPTY(d) ((d).head == 0)
#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
#define DEQ_HEAD(d) ((d).head)
#define DEQ_TAIL(d) ((d).tail)
@@ -156,4 +157,17 @@ do {
CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
} while (0)
+#define DEQ_APPEND(d1,d2) \
+do { \
+ if (!(d1).head) \
+ (d1) = (d2); \
+ else if ((d2).head) { \
+ (d1).tail->next = (d2).head; \
+ (d2).head->prev = (d1).tail; \
+ (d1).tail = (d2).tail; \
+ (d1).size += (d2).size; \
+ } \
+ DEQ_INIT(d2); \
+} while (0)
+
#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/message.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/message.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/message.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/message.h Thu Mar 12 19:32:16 2015
@@ -144,18 +144,52 @@ qd_message_t *qd_message_copy(qd_message
qd_parsed_field_t *qd_message_message_annotations(qd_message_t *msg);
/**
- * Set the message annotations for the message. If the message already has message annotations,
- * they will be overwritten/replaced by the new field.
+ * Set the value for the QD_MA_TRACE field in the outgoing message annotations
+ * for the message.
+ *
+ * IMPORTANT: This method takes ownership of the trace_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param trace_field Pointer to a composed field representing the list that
+ * will be used as the value for the QD_MA_TRACE map entry. If null, the
+ * message will not have a QA_MA_TRACE message annotation field. Ownership of
+ * this field is transferred to the message.
+ *
+ */
+void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *trace_field);
+
+/**
+ * Set the value for the QD_MA_TO field in the outgoing message annotations for
+ * the message.
+ *
+ * IMPORTANT: This method takes ownership of the to_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param to_field Pointer to a composed field representing the to overrid
+ * address that will be used as the value for the QD_MA_TO map entry. If null,
+ * the message will not have a QA_MA_TO message annotation field. Ownership of
+ * this field is transferred to the message.
+ *
+ */
+void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
+
+/**
+ * Set the value for the QD_MA_INGRESS field in the outgoing message
+ * annotations for the message.
+ *
+ * IMPORTANT: This method takes ownership of the ingress_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param ingress_field Pointer to a composed field representing ingress router
+ * that will be used as the value for the QD_MA_INGRESS map entry. If null,
+ * the message will not have a QA_MA_INGRESS message annotation field.
+ * Ownership of this field is transferred to the message.
*
- * @param msg Pointer to a receiver message.
- * @param da Pointer to a composed field representing the new message annotations of the message.
- * If null, the message will not have a message annotations field.
- * IMPORTANT: The message will not take ownership of the composed field. The
- * caller is responsible for freeing it after this call. Since the contents
- * are copied into the message, it is safe to free the composed field
- * any time after the call to this function.
*/
-void qd_message_set_message_annotations(qd_message_t *msg, qd_composed_field_t *da);
+void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field);
/**
* Receive message data via a delivery. This function may be called more than once on the same
Modified: qpid/dispatch/trunk/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/buffer.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/buffer.c (original)
+++ qpid/dispatch/trunk/src/buffer.c Thu Mar 12 19:32:16 2015
@@ -20,6 +20,8 @@
#include <qpid/dispatch/buffer.h>
#include "alloc.h"
+#include <string.h>
+
static size_t buffer_size = 512;
static int size_locked = 0;
@@ -81,3 +83,52 @@ void qd_buffer_insert(qd_buffer_t *buf,
buf->size += len;
assert(buf->size <= buffer_size);
}
+
+unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const qd_buffer_list_t *src)
+{
+ uint32_t len = 0;
+ DEQ_INIT(*dst);
+ qd_buffer_t *buf = DEQ_HEAD(*src);
+ while (buf) {
+ size_t to_copy = qd_buffer_size(buf);
+ unsigned char *src = qd_buffer_base(buf);
+ len += to_copy;
+ while (to_copy) {
+ qd_buffer_t *newbuf = qd_buffer();
+ size_t count = qd_buffer_capacity(newbuf);
+ // default buffer capacity may have changed,
+ // so don't assume it will fit:
+ if (count > to_copy) count = to_copy;
+ memcpy(qd_buffer_cursor(newbuf), src, count);
+ qd_buffer_insert(newbuf, count);
+ DEQ_INSERT_TAIL(*dst, newbuf);
+ src += count;
+ to_copy -= count;
+ }
+ buf = DEQ_NEXT(buf);
+ }
+ return len;
+}
+
+
+void qd_buffer_list_free_buffers(qd_buffer_list_t *list)
+{
+ qd_buffer_t *buf = DEQ_HEAD(*list);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*list);
+ qd_buffer_free(buf);
+ buf = DEQ_HEAD(*list);
+ }
+}
+
+
+unsigned int qd_buffer_list_length(const qd_buffer_list_t *list)
+{
+ unsigned int len = 0;
+ qd_buffer_t *buf = DEQ_HEAD(*list);
+ while (buf) {
+ len += qd_buffer_size(buf);
+ buf = DEQ_NEXT(buf);
+ }
+ return len;
+}
Modified: qpid/dispatch/trunk/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/compose.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/compose.c (original)
+++ qpid/dispatch/trunk/src/compose.c Thu Mar 12 19:32:16 2015
@@ -35,6 +35,14 @@ static void bump_count(qd_composed_field
comp->count++;
}
+static void bump_length(qd_composed_field_t *field,
+ uint32_t length)
+{
+ qd_composite_t *comp = DEQ_HEAD(field->fieldStack);
+ if (comp)
+ comp->length += length;
+}
+
static void qd_insert(qd_composed_field_t *field, const uint8_t *seq, size_t len)
{
@@ -185,7 +193,7 @@ static void qd_compose_end_composite(qd_
}
-qd_composed_field_t *qd_compose(uint64_t performative, qd_composed_field_t *extend)
+qd_composed_field_t *qd_compose_subfield(qd_composed_field_t *extend)
{
qd_composed_field_t *field = extend;
@@ -200,8 +208,18 @@ qd_composed_field_t *qd_compose(uint64_t
DEQ_INIT(field->fieldStack);
}
- qd_insert_8(field, 0x00);
- qd_compose_insert_ulong(field, performative);
+ return field;
+}
+
+
+qd_composed_field_t *qd_compose(uint64_t performative, qd_composed_field_t *extend)
+{
+ qd_composed_field_t *field = qd_compose_subfield(extend);
+
+ if (field) {
+ qd_insert_8(field, 0x00);
+ qd_compose_insert_ulong(field, performative);
+ }
return field;
}
@@ -392,6 +410,8 @@ void qd_compose_insert_binary_buffers(qd
DEQ_INSERT_TAIL(field->buffers, buf);
buf = DEQ_HEAD(*buffers);
}
+ bump_length(field, len);
+ bump_count(field);
}
@@ -470,3 +490,24 @@ qd_buffer_list_t *qd_compose_buffers(qd_
{
return &field->buffers;
}
+
+void qd_compose_take_buffers(qd_composed_field_t *field,
+ qd_buffer_list_t *list)
+{
+ // assumption: extracting partially built containers is wrong:
+ assert(DEQ_SIZE(field->fieldStack) == 0);
+ *list = *qd_compose_buffers(field);
+ DEQ_INIT(field->buffers); // Zero out the linkage to the now moved buffers.
+}
+
+void qd_compose_insert_buffers(qd_composed_field_t *field,
+ qd_buffer_list_t *list)
+{
+ uint32_t len = qd_buffer_list_length(list);
+ if (len) {
+ DEQ_APPEND(field->buffers, *list);
+ bump_length(field, len);
+ bump_count(field);
+ }
+}
+
Modified: qpid/dispatch/trunk/src/message.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Thu Mar 12 19:32:16 2015
@@ -536,6 +536,9 @@ qd_message_t *qd_message()
return 0;
DEQ_ITEM_INIT(msg);
+ DEQ_INIT(msg->ma_to_override);
+ DEQ_INIT(msg->ma_trace);
+ DEQ_INIT(msg->ma_ingress);
msg->content = new_qd_message_content_t();
if (msg->content == 0) {
@@ -558,6 +561,11 @@ void qd_message_free(qd_message_t *in_ms
if (!in_msg) return;
uint32_t rc;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+
+ qd_buffer_list_free_buffers(&msg->ma_to_override);
+ qd_buffer_list_free_buffers(&msg->ma_trace);
+ qd_buffer_list_free_buffers(&msg->ma_ingress);
+
qd_message_content_t *content = msg->content;
sys_mutex_lock(content->lock);
@@ -575,13 +583,6 @@ void qd_message_free(qd_message_t *in_ms
buf = DEQ_HEAD(content->buffers);
}
- buf = DEQ_HEAD(content->new_message_annotations);
- while (buf) {
- DEQ_REMOVE_HEAD(content->new_message_annotations);
- qd_buffer_free(buf);
- buf = DEQ_HEAD(content->new_message_annotations);
- }
-
sys_mutex_free(content->lock);
free_qd_message_content_t(content);
}
@@ -600,6 +601,10 @@ qd_message_t *qd_message_copy(qd_message
return 0;
DEQ_ITEM_INIT(copy);
+ qd_buffer_list_clone(©->ma_to_override, &msg->ma_to_override);
+ qd_buffer_list_clone(©->ma_trace, &msg->ma_trace);
+ qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress);
+
copy->content = content;
sys_mutex_lock(content->lock);
@@ -637,16 +642,29 @@ qd_parsed_field_t *qd_message_message_an
}
-void qd_message_set_message_annotations(qd_message_t *msg, qd_composed_field_t *da)
+void qd_message_set_trace_annotation(qd_message_t *in_msg, qd_composed_field_t *trace_field)
{
- qd_message_content_t *content = MSG_CONTENT(msg);
- qd_buffer_list_t *field_buffers = qd_compose_buffers(da);
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_buffer_list_free_buffers(&msg->ma_trace);
+ qd_compose_take_buffers(trace_field, &msg->ma_trace);
+ qd_compose_free(trace_field);
+}
- assert(DEQ_SIZE(content->new_message_annotations) == 0);
- content->new_message_annotations = *field_buffers;
- DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_field_t *to_field)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_buffer_list_free_buffers(&msg->ma_to_override);
+ qd_compose_take_buffers(to_field, &msg->ma_to_override);
+ qd_compose_free(to_field);
}
+void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_buffer_list_free_buffers(&msg->ma_ingress);
+ qd_compose_take_buffers(ingress_field, &msg->ma_ingress);
+ qd_compose_free(ingress_field);
+}
qd_message_t *qd_message_receive(qd_delivery_t *delivery)
{
@@ -740,6 +758,41 @@ static void send_handler(void *context,
pn_link_send(pnl, (const char*) start, length);
}
+// create a buffer chain holding the outgoing message annotations section
+static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out)
+{
+ DEQ_INIT(*out);
+ if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
+ !DEQ_IS_EMPTY(msg->ma_trace) ||
+ !DEQ_IS_EMPTY(msg->ma_ingress)) {
+
+ qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+ qd_compose_start_map(out_ma);
+
+ if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_TO);
+ qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+ }
+
+ if (!DEQ_IS_EMPTY(msg->ma_trace)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
+ qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+ }
+
+ if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+ qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+ }
+
+ qd_compose_end_map(out_ma);
+
+ qd_compose_take_buffers(out_ma, out);
+ qd_compose_free(out_ma);
+ return true;
+ }
+
+ return false;
+}
void qd_message_send(qd_message_t *in_msg, qd_link_t *link)
{
@@ -754,7 +807,8 @@ void qd_message_send(qd_message_t *in_ms
qd_message_repr(in_msg, repr, sizeof(repr)),
pn_link_name(pnl));
- if (DEQ_SIZE(content->new_message_annotations) > 0) {
+ qd_buffer_list_t new_ma;
+ if (compose_message_annotations(msg, &new_ma)) {
//
// This is the case where the message annotations have been modified.
// The message send must be divided into sections: The existing header;
@@ -765,6 +819,7 @@ void qd_message_send(qd_message_t *in_ms
// Start by making sure that we've parsed the message sections through
// the message annotations
//
+ // ??? NO LONGER NECESSARY???
if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
return;
@@ -785,11 +840,12 @@ void qd_message_send(qd_message_t *in_ms
//
// Send new message annotations
//
- qd_buffer_t *da_buf = DEQ_HEAD(content->new_message_annotations);
+ qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
while (da_buf) {
pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
da_buf = DEQ_NEXT(da_buf);
}
+ qd_buffer_list_free_buffers(&new_ma);
//
// Skip over replaced message annotations
@@ -1046,6 +1102,11 @@ void qd_message_compose_1(qd_message_t *
//qd_compose_insert_uint(field, 0); // delivery-count
qd_compose_end_list(field);
+ qd_buffer_list_t out_ma;
+ if (compose_message_annotations((qd_message_pvt_t*)msg, &out_ma)) {
+ qd_compose_insert_buffers(field, &out_ma);
+ }
+
field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
qd_compose_start_list(field);
qd_compose_insert_null(field); // message-id
@@ -1068,10 +1129,7 @@ void qd_message_compose_1(qd_message_t *
qd_compose_insert_binary_buffers(field, buffers);
}
- qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
- content->buffers = *field_buffers;
- DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
-
+ qd_compose_take_buffers(field, &content->buffers);
qd_compose_free(field);
}
Modified: qpid/dispatch/trunk/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message_private.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message_private.h (original)
+++ qpid/dispatch/trunk/src/message_private.h Thu Mar 12 19:32:16 2015
@@ -59,11 +59,6 @@ typedef struct {
// TODO - consider using pointers to qd_field_location_t below to save memory
-// TODO - we need a second buffer list for modified annotations and header
-// There are three message scenarios:
-// 1) Received message is held and forwarded unmodified - single buffer list
-// 2) Received message is held and modified before forwarding - two buffer lists
-// 3) Message is composed internally - single buffer list
// TODO - provide a way to allocate a message without a lock for the link-routing case.
// It's likely that link-routing will cause no contention for the message content.
//
@@ -72,7 +67,6 @@ typedef struct {
sys_mutex_t *lock;
uint32_t ref_count; // The number of messages referencing this
qd_buffer_list_t buffers; // The buffer chain containing the message
- qd_buffer_list_t new_message_annotations; // The buffer chain containing the new message annotations (MOVE TO MSG_PVT)
qd_field_location_t section_message_header; // The message header list
qd_field_location_t section_delivery_annotation; // The delivery annotation map
qd_field_location_t section_message_annotation; // The message annotation map
@@ -96,6 +90,10 @@ typedef struct {
typedef struct {
DEQ_LINKS(qd_message_t); // Deque linkage that overlays the qd_message_t
qd_message_content_t *content;
+ qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
+ qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
+ qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
+
} qd_message_pvt_t;
ALLOC_DECLARE(qd_message_t);
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Mar 12 19:32:16 2015
@@ -546,7 +546,6 @@ static qd_field_iterator_t *router_annot
int *drop,
const char *to_override)
{
- qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
qd_field_iterator_t *ingress_iter = 0;
qd_parsed_field_t *trace = 0;
@@ -554,62 +553,81 @@ static qd_field_iterator_t *router_annot
qd_parsed_field_t *to = 0;
if (in_ma) {
- trace = qd_parse_value_by_key(in_ma, QD_MA_TRACE);
- ingress = qd_parse_value_by_key(in_ma, QD_MA_INGRESS);
- to = qd_parse_value_by_key(in_ma, QD_MA_TO);
- }
-
- qd_compose_start_map(out_ma);
+ uint32_t count = qd_parse_sub_count(in_ma);
+ bool done = false;
- //
- // If there is a to_override provided, insert a TO field.
- //
- if (to_override) {
- qd_compose_insert_symbol(out_ma, QD_MA_TO);
- qd_compose_insert_string(out_ma, to_override);
- } else if (to) {
- qd_compose_insert_symbol(out_ma, QD_MA_TO);
- qd_compose_insert_string_iterator(out_ma, qd_parse_raw(to));
+ for (uint32_t idx = 0; idx < count && !done; idx++) {
+ qd_parsed_field_t *sub = qd_parse_sub_key(in_ma, idx);
+ if (!sub) continue;
+ qd_field_iterator_t *iter = qd_parse_raw(sub);
+ if (!iter) continue;
+
+ if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TRACE)) {
+ trace = qd_parse_sub_value(in_ma, idx);
+ } else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_INGRESS)) {
+ ingress = qd_parse_sub_value(in_ma, idx);
+ } else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TO)) {
+ to = qd_parse_sub_value(in_ma, idx);
+ }
+ done = trace && ingress && to;
+ }
}
//
+ // QD_MA_TRACE:
// If there is a trace field, append this router's ID to the trace.
+ // If the router ID is already in the trace the msg has looped.
//
- qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
- qd_compose_start_list(out_ma);
+ qd_composed_field_t *trace_field = qd_compose_subfield(0);
+ qd_compose_start_list(trace_field);
if (trace) {
if (qd_parse_is_list(trace)) {
uint32_t idx = 0;
qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
while (trace_item) {
qd_field_iterator_t *iter = qd_parse_raw(trace_item);
- if (qd_field_iterator_equal(iter, (unsigned char*) node_id))
+ if (qd_field_iterator_equal(iter, (unsigned char*) node_id)) {
*drop = 1;
- qd_compose_insert_string_iterator(out_ma, iter);
+ return 0; // no further processing necessary
+ }
+ qd_field_iterator_reset(iter);
+ qd_compose_insert_string_iterator(trace_field, iter);
idx++;
trace_item = qd_parse_sub_value(trace, idx);
}
}
}
+ qd_compose_insert_string(trace_field, node_id);
+ qd_compose_end_list(trace_field);
+ qd_message_set_trace_annotation(msg, trace_field);
- qd_compose_insert_string(out_ma, node_id);
- qd_compose_end_list(out_ma);
+ //
+ // QD_MA_TO:
+ // The supplied to override takes precedense over any existing
+ // value.
+ //
+ if (to_override) { // takes precedence over existing value
+ qd_composed_field_t *to_field = qd_compose_subfield(0);
+ qd_compose_insert_string(to_field, to_override);
+ qd_message_set_to_override_annotation(msg, to_field);
+ } else if (to) {
+ qd_composed_field_t *to_field = qd_compose_subfield(0);
+ qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
+ qd_message_set_to_override_annotation(msg, to_field);
+ }
//
- // If there is no ingress field, annotate the ingress as this router else
- // keep the original field.
+ // QD_MA_INGRESS:
+ // If there is no ingress field, annotate the ingress as
+ // this router else keep the original field.
//
- qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+ qd_composed_field_t *ingress_field = qd_compose_subfield(0);
if (ingress && qd_parse_is_scalar(ingress)) {
ingress_iter = qd_parse_raw(ingress);
- qd_compose_insert_string_iterator(out_ma, ingress_iter);
+ qd_compose_insert_string_iterator(ingress_field, ingress_iter);
} else
- qd_compose_insert_string(out_ma, node_id);
-
- qd_compose_end_map(out_ma);
-
- qd_message_set_message_annotations(msg, out_ma);
- qd_compose_free(out_ma);
+ qd_compose_insert_string(ingress_field, node_id);
+ qd_message_set_ingress_annotation(msg, ingress_field);
//
// Return the iterator to the ingress field _if_ it was present.
Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Thu Mar 12 19:32:16 2015
@@ -49,6 +49,7 @@ target_link_libraries(unit_tests qpid-di
set(unit_test_size_SOURCES
field_test.c
message_test.c
+ buffer_test.c
run_unit_tests_size.c
)
Added: qpid/dispatch/trunk/tests/buffer_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/buffer_test.c?rev=1666279&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/buffer_test.c (added)
+++ qpid/dispatch/trunk/tests/buffer_test.c Thu Mar 12 19:32:16 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <string.h>
+#include "test_case.h"
+#include <qpid/dispatch/buffer.h>
+
+
+static void fill_buffer(qd_buffer_list_t *list,
+ const unsigned char *data,
+ int length)
+{
+ DEQ_INIT(*list);
+ while (length > 0) {
+ qd_buffer_t *buf = qd_buffer();
+ size_t count = qd_buffer_capacity(buf);
+ if (length < count) count = length;
+ memcpy(qd_buffer_cursor(buf),
+ data, count);
+ qd_buffer_insert(buf, count);
+ DEQ_INSERT_TAIL(*list, buf);
+ data += count;
+ length -= count;
+ }
+}
+
+static int compare_buffer(const qd_buffer_list_t *list,
+ const unsigned char *data,
+ int length)
+{
+ qd_buffer_t *buf = DEQ_HEAD(*list);
+ while (buf && length > 0) {
+ size_t count = qd_buffer_size(buf);
+ if (length < count) count = length;
+ if (memcmp(qd_buffer_base(buf), data, count))
+ return 0;
+ length -= count;
+ data += count;
+ buf = DEQ_NEXT(buf);
+ }
+ return !buf && length == 0;
+}
+
+
+static const char pattern[] = "This piggy went 'wee wee wee' all the way home!";
+static const int pattern_len = sizeof(pattern);
+
+static char *test_buffer_list_clone(void *context)
+{
+ qd_buffer_list_t list;
+ fill_buffer(&list, (unsigned char *)pattern, pattern_len);
+ if (qd_buffer_list_length(&list) != pattern_len) return "Invalid fill?";
+
+ qd_buffer_list_t copy;
+ unsigned int len = qd_buffer_list_clone(©, &list);
+ if (len != pattern_len) return "Copy failed";
+
+ // 'corrupt' source buffer list:
+ *qd_buffer_base(DEQ_HEAD(list)) = (unsigned char)'X';
+ qd_buffer_list_free_buffers(&list);
+ if (!DEQ_IS_EMPTY(list)) return "List should be empty!";
+
+ // ensure copy is un-molested:
+ if (!compare_buffer(©, (unsigned char *)pattern, pattern_len)) return "Buffer list corrupted";
+
+ return 0;
+}
+
+
+int buffer_tests()
+{
+ int result = 0;
+
+ TEST_CASE(test_buffer_list_clone, 0);
+
+ return result;
+}
+
Propchange: qpid/dispatch/trunk/tests/buffer_test.c
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/tests/compose_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/compose_test.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/compose_test.c (original)
+++ qpid/dispatch/trunk/tests/compose_test.c Thu Mar 12 19:32:16 2015
@@ -278,6 +278,67 @@ static char *test_compose_scalars(void *
}
+// verify composition via a set of sub-fields
+static char *vector3 =
+ "\x00\x53\x72" // message annotations
+ "\xD1\x00\x00\x00\x1A\x00\x00\x00\x04" // map32, 26 bytes, 4 fields
+ "\xA1\x04Key1" // str8
+ "\x70\x00\x00\x03\xE7" // uint 999
+ "\xA1\x04Key2" // str8
+ "\x70\x00\x00\x03\x78"; // uint 888
+static int vector3_length = 34;
+
+static char *test_compose_subfields(void *context)
+{
+ qd_composed_field_t *sub1 = qd_compose_subfield(0);
+ qd_compose_insert_string(sub1, "Key1");
+ qd_composed_field_t *sub2 = qd_compose_subfield(0);
+ qd_compose_insert_uint(sub2, 999);
+
+ qd_composed_field_t *sub3 = qd_compose_subfield(0);
+ qd_compose_insert_string(sub3, "Key2");
+
+ //
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+ qd_compose_start_map(field);
+ qd_compose_insert_buffers(field, &sub1->buffers);
+ if (!DEQ_IS_EMPTY(sub1->buffers)) return "Buffer chain ownership not transferred!";
+ qd_compose_free(sub1);
+ qd_compose_insert_buffers(field, &sub2->buffers);
+ qd_compose_free(sub2);
+
+ qd_compose_insert_buffers(field, &sub3->buffers);
+ qd_compose_free(sub3);
+ qd_compose_insert_uint(field, 888);
+ qd_compose_end_map(field);
+
+ qd_buffer_list_t list;
+ qd_compose_take_buffers(field, &list);
+ if (!DEQ_IS_EMPTY(field->buffers)) return "Buffer list not removed!";
+
+ qd_compose_free(field);
+
+ if (qd_buffer_list_length(&list) != vector3_length)
+ return "Improper encoded length";
+
+ unsigned char *src = (unsigned char *)vector3;
+ qd_buffer_t *buf = DEQ_HEAD(list);
+ while (buf) {
+ unsigned char *c = qd_buffer_base(buf);
+ while (c != (qd_buffer_base(buf) + qd_buffer_size(buf))) {
+ if (*c != *src) return "Pattern Mismatch";
+ c++;
+ src++;
+ }
+ buf = DEQ_NEXT(buf);
+ }
+
+ qd_buffer_list_free_buffers(&list);
+
+ return 0;
+}
+
+
int compose_tests()
{
int result = 0;
@@ -285,6 +346,7 @@ int compose_tests()
TEST_CASE(test_compose_list_of_maps, 0);
TEST_CASE(test_compose_nested_composites, 0);
TEST_CASE(test_compose_scalars, 0);
+ TEST_CASE(test_compose_subfields, 0);
return result;
}
Modified: qpid/dispatch/trunk/tests/message_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/message_test.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/message_test.c (original)
+++ qpid/dispatch/trunk/tests/message_test.c Thu Mar 12 19:32:16 2015
@@ -22,6 +22,7 @@
#include <string.h>
#include "message_private.h"
#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/amqp.h>
#include <proton/message.h>
static char buffer[10000];
@@ -212,6 +213,82 @@ static char* test_check_multiple(void *c
}
+static char* test_send_message_annotations(void *context)
+{
+ qd_message_t *msg = qd_message();
+ qd_message_content_t *content = MSG_CONTENT(msg);
+
+ qd_composed_field_t *trace = qd_compose_subfield(0);
+ qd_compose_start_list(trace);
+ qd_compose_insert_string(trace, "Node1");
+ qd_compose_insert_string(trace, "Node2");
+ qd_compose_end_list(trace);
+ qd_message_set_trace_annotation(msg, trace);
+ qd_compose_free(trace);
+
+ qd_composed_field_t *to_override = qd_compose_subfield(0);
+ qd_compose_insert_string(to_override, "to/address");
+ qd_message_set_to_override_annotation(msg, to_override);
+ qd_compose_free(to_override);
+
+ qd_composed_field_t *ingress = qd_compose_subfield(0);
+ qd_compose_insert_string(ingress, "distress");
+ qd_message_set_ingress_annotation(msg, ingress);
+ qd_compose_free(ingress);
+
+ qd_message_compose_1(msg, "test_addr_0", 0);
+ qd_buffer_t *buf = DEQ_HEAD(content->buffers);
+ if (buf == 0) return "Expected a buffer in the test message";
+
+ pn_message_t *pn_msg = pn_message();
+ size_t len = flatten_bufs(content);
+ int result = pn_message_decode(pn_msg, buffer, len);
+ if (result != 0) return "Error in pn_message_decode";
+
+ pn_data_t *ma = pn_message_annotations(pn_msg);
+ if (!ma) return "Missing message annotations";
+ pn_data_rewind(ma);
+ pn_data_next(ma);
+ if (pn_data_type(ma) != PN_MAP) return "Invalid message annotation type";
+ if (pn_data_get_map(ma) != 6) return "Invalid map length";
+ pn_data_enter(ma);
+ for (int i = 0; i < 6; i+=2) {
+ pn_data_next(ma);
+ if (pn_data_type(ma) != PN_SYMBOL) return "Bad map index";
+ pn_bytes_t sym = pn_data_get_symbol(ma);
+ if (!strncmp(QD_MA_INGRESS, sym.start, sym.size)) {
+ pn_data_next(ma);
+ sym = pn_data_get_string(ma);
+ if (strncmp("distress", sym.start, sym.size)) return "Bad ingress";
+ //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+ } else if (!strncmp(QD_MA_TO, sym.start, sym.size)) {
+ pn_data_next(ma);
+ sym = pn_data_get_string(ma);
+ if (strncmp("to/address", sym.start, sym.size)) return "Bad to override";
+ //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+ } else if (!strncmp(QD_MA_TRACE, sym.start, sym.size)) {
+ pn_data_next(ma);
+ if (pn_data_type(ma) != PN_LIST) return "List not found";
+ pn_data_enter(ma);
+ pn_data_next(ma);
+ sym = pn_data_get_string(ma);
+ if (strncmp("Node1", sym.start, sym.size)) return "Bad trace entry";
+ //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+ pn_data_next(ma);
+ sym = pn_data_get_string(ma);
+ if (strncmp("Node2", sym.start, sym.size)) return "Bad trace entry";
+ //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+ pn_data_exit(ma);
+ } else return "Unexpected map key";
+ }
+
+ pn_message_free(pn_msg);
+ qd_message_free(msg);
+
+ return 0;
+}
+
+
int message_tests(void)
{
int result = 0;
@@ -220,6 +297,7 @@ int message_tests(void)
TEST_CASE(test_receive_from_messenger, 0);
TEST_CASE(test_message_properties, 0);
TEST_CASE(test_check_multiple, 0);
+ TEST_CASE(test_send_message_annotations, 0);
return result;
}
Modified: qpid/dispatch/trunk/tests/run_unit_tests_size.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/run_unit_tests_size.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/run_unit_tests_size.c (original)
+++ qpid/dispatch/trunk/tests/run_unit_tests_size.c Thu Mar 12 19:32:16 2015
@@ -22,6 +22,7 @@
int message_tests();
int field_tests();
+int buffer_tests();
int main(int argc, char** argv)
{
@@ -39,8 +40,9 @@ int main(int argc, char** argv)
int result = 0;
result += message_tests();
result += field_tests();
- qd_alloc_finalize();
+ result += buffer_tests();
+ qd_alloc_finalize();
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org