You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/07/07 20:39:43 UTC
[06/14] qpid-dispatch git commit: DISPATCH-760: Message handling uses
new scheme
DISPATCH-760: Message handling uses new scheme
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/21014c6e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/21014c6e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/21014c6e
Branch: refs/heads/master
Commit: 21014c6e9f8dd2ea05551f341487a9d486c074cd
Parents: b67b201
Author: Chuck Rolke <cr...@redhat.com>
Authored: Sat Jul 1 15:01:56 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Jul 7 10:38:33 2017 -0400
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 46 +++++-
src/message.c | 299 +++++++++++++++++++++++++++--------
src/message_private.h | 16 +-
src/router_node.c | 70 ++++----
4 files changed, 313 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 523e150..d3f2bb4 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -133,16 +133,14 @@ void qd_message_free(qd_message_t *msg);
qd_message_t *qd_message_copy(qd_message_t *msg);
/**
- * Retrieve the message annotations from a message.
+ * Retrieve the message annotations from a message and place them in message storage.
*
* IMPORTANT: The pointer returned by this function remains owned by the message.
* The caller MUST NOT free the parsed field.
*
* @param msg Pointer to a received message.
- * @return Pointer to the parsed field for the message annotations. If the message doesn't
- * have message annotations, the return value shall be NULL.
*/
-qd_parsed_field_t *qd_message_message_annotations(qd_message_t *msg);
+void qd_message_message_annotations(qd_message_t *msg);
/**
* Set the value for the QD_MA_TRACE field in the outgoing message annotations
@@ -258,6 +256,46 @@ int qd_message_repr_len();
qd_log_source_t* qd_message_log_source();
+/**
+ * Accessor for message field ingress
+ *
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg);
+
+/**
+ * Accessor for message field phase
+ *
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_phase (qd_message_t *msg);
+
+/**
+ * Accessor for message field to_override
+ *
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
+
+/**
+ * Accessor for message field trace
+ *
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_trace (qd_message_t *msg);
+
+/**
+ * Accessor for message field phase
+ *
+ * @param msg A pointer to the message
+ * @return the phase as an integer
+ */
+int qd_message_get_phase_val (qd_message_t *msg);
+
///@}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 706458d..3e78930 100644
--- a/src/message.c
+++ b/src/message.c
@@ -23,6 +23,7 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/buffer.h>
#include <proton/object.h>
#include "message_private.h"
#include "compose_private.h"
@@ -33,6 +34,7 @@
#include <limits.h>
#include <time.h>
#include <inttypes.h>
+#include <assert.h>
const char *STR_AMQP_NULL = "null";
const char *STR_AMQP_TRUE = "T";
@@ -849,7 +851,6 @@ qd_message_t *qd_message()
msg->content->lock = sys_mutex();
sys_atomic_init(&msg->content->ref_count, 1);
msg->content->parse_depth = QD_DEPTH_NONE;
- msg->content->parsed_message_annotations = 0;
return (qd_message_t*) msg;
}
@@ -870,8 +871,16 @@ void qd_message_free(qd_message_t *in_msg)
rc = sys_atomic_dec(&content->ref_count) - 1;
if (rc == 0) {
- if (content->parsed_message_annotations)
- qd_parse_free(content->parsed_message_annotations);
+ if (content->ma_field_iter_in)
+ qd_iterator_free(content->ma_field_iter_in);
+ if (content->ma_pf_ingress)
+ qd_parse_free(content->ma_pf_ingress);
+ if (content->ma_pf_phase)
+ qd_parse_free(content->ma_pf_phase);
+ if (content->ma_pf_to_override)
+ qd_parse_free(content->ma_pf_to_override);
+ if (content->ma_pf_trace)
+ qd_parse_free(content->ma_pf_trace);
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
@@ -902,6 +911,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
qd_buffer_list_clone(©->ma_trace, &msg->ma_trace);
qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress);
copy->ma_phase = msg->ma_phase;
+ copy->is_interrouter = msg->is_interrouter;
copy->content = content;
@@ -912,30 +922,49 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
return (qd_message_t*) copy;
}
-qd_parsed_field_t *qd_message_message_annotations(qd_message_t *in_msg)
+void qd_message_message_annotations(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
- if (content->parsed_message_annotations)
- return content->parsed_message_annotations;
+ if (content->ma_parsed)
+ return ;
+ content->ma_parsed = true;
- qd_iterator_t *ma = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
- if (ma == 0)
- return 0;
+ content->ma_field_iter_in = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
+ if (content->ma_field_iter_in == 0)
+ return;
- content->parsed_message_annotations = qd_parse(ma);
- if (content->parsed_message_annotations == 0 ||
- !qd_parse_ok(content->parsed_message_annotations) ||
- !qd_parse_is_map(content->parsed_message_annotations)) {
- qd_iterator_free(ma);
- qd_parse_free(content->parsed_message_annotations);
- content->parsed_message_annotations = 0;
- return 0;
+ qd_parse_annotations(
+ msg->is_interrouter,
+ content->ma_field_iter_in,
+ &content->ma_pf_ingress,
+ &content->ma_pf_phase,
+ &content->ma_pf_to_override,
+ &content->ma_pf_trace,
+ &content->ma_user_annotation_blob,
+ &content->ma_count);
+
+ // Construct pseudo-field location of user annotations blob
+ // This holds all annotations if no router-specific annotations are present
+ if (content->ma_count > 0) {
+ qd_field_location_t *cf = &content->field_user_annotations;
+ qd_iterator_pointer_t *uab = &content->ma_user_annotation_blob;
+ cf->buffer = uab->buffer;
+ cf->offset = uab->cursor - qd_buffer_base(uab->buffer);
+ cf->length = uab->remaining;
+ cf->parsed = true;
+ if (content->ma_count > 4) {
+ //fprintf(stdout, "V2_DEV set ma_count to %d, len=%d\n", content->ma_count, (int)cf->length);
+ }
+ }
+
+ // extract phase
+ if (content->ma_pf_phase) {
+ content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
}
- qd_iterator_free(ma);
- return content->parsed_message_annotations;
+ return;
}
@@ -991,6 +1020,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
+ qd_link_t *qdl = (qd_link_t *)pn_link_get_context(link);
+ qd_connection_t *qdc = qd_link_connection(qdl);
+ msg->is_interrouter = qd_connection_is_interrouter(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
}
@@ -1070,40 +1102,78 @@ static void send_handler(void *context, const unsigned char *start, int length)
}
-// create a buffer chain holding the outgoing message annotations section
-static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out, bool strip_annotations)
+// create_message_annotations
+//
+// Create a buffer chain holding the outgoing message annotations section.
+// Three types of outbound link are available:
+// V1 - 0.8.x routers that implement the original annotation scheme
+// V2 - 1.0.x routers that implement the new annotation scheme
+// V0 - End system clients that do not get any router annotations
+//
+// These routines do not strictly create the buffer chain in a ready-to-go form.
+// Rather hints are left to transmit the annotations in three sections:
+// 1. The optional map header returned in 'out'. This may hold:
+// a. empty if no user annotations and no router annotations
+// b. A map header if just user annotations
+// In case b the map header relies on the transmit routine to
+// send the opaque blob of user annotations.
+// 2. The optional blob of user annotations. The buffers for this section
+// are not copied into any buffer chain. Instead they are consumed
+// directly from the incoming message buffer chain.
+// These buffers are discovered in qd_message_message_annotations
+// and located using msg.content->ma_user_annotation_blob.
+// 3. The optional V1 annotation key-value pairs.
+// These are identified by the out_trailer buffer chain.
+//
+// Function qd_message_send must synthesize the annotations by sending
+// buffer chain 'out', the user's blob, and buffer chain 'out_trailer' as required.
+// The map in chain 'out' has been created with a gimmick that adjusts the size
+// of the map in the outbound message even though the the data is not present in
+// buffer chain 'out'.
+static void compose_message_annotations_v0(qd_message_pvt_t *msg, qd_buffer_list_t *out)
{
qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
bool map_started = false;
- //We will have to add the custom annotations
- qd_parsed_field_t *in_ma = qd_parse_dup(msg->content->parsed_message_annotations);
- if (in_ma) {
- uint32_t count = qd_parse_sub_count(in_ma);
+ if (msg->content->ma_count > 0) {
+ // insert the incoming message remaining elements
+ if (!map_started) {
+ qd_compose_start_map(out_ma);
+ map_started = true;
+ }
- for (uint32_t idx = 0; idx < count; idx++) {
- qd_parsed_field_t *sub_key = qd_parse_sub_key(in_ma, idx);
- if (!sub_key)
- continue;
+ // Bump the map size and count to reflect user's blob.
+ // Note that the blob is not inserted here. This code adjusts the
+ // size/count of the map that is under construction and the content
+ // is inserted by router-node
+ qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
+ msg->content->field_user_annotations.length);
+ }
- qd_iterator_t *iter = qd_parse_raw(sub_key);
+ if (map_started) {
+ qd_compose_end_map(out_ma);
+ qd_compose_take_buffers(out_ma, out);
+ }
- if (!qd_iterator_prefix(iter, QD_MA_PREFIX)) {
- if (!map_started) {
- qd_compose_start_map(out_ma);
- map_started = true;
- }
- qd_parsed_field_t *sub_value = qd_parse_sub_value(in_ma, idx);
- qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_key));
- qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_value));
- }
- }
+ qd_compose_free(out_ma);
+}
- qd_parse_free(in_ma);
- }
- //Add the dispatch router specific annotations only if strip_annotations is false.
+static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list_t *out,
+ qd_buffer_list_t *out_trailer, bool strip_annotations)
+{
+ qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+
+ bool map_started = false;
+
+ // v1 annotations go into the out_trailer
+ int field_count = 0;
+ qd_composed_field_t *field = qd_compose_subfield(0);
+ if (!field)
+ return;
+
+ // If not stripping then add dispatch router specific annotations
if (!strip_annotations) {
if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
!DEQ_IS_EMPTY(msg->ma_trace) ||
@@ -1116,35 +1186,80 @@ static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
}
if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
- qd_compose_insert_symbol(out_ma, QD_MA_TO);
- qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+ qd_compose_insert_symbol(field, QD_MA_TO);
+ qd_compose_insert_buffers(field, &msg->ma_to_override);
+ field_count++;
}
if (!DEQ_IS_EMPTY(msg->ma_trace)) {
- qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
- qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+ qd_compose_insert_symbol(field, QD_MA_TRACE);
+ qd_compose_insert_buffers(field, &msg->ma_trace);
+ field_count++;
}
if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
- qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
- qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+ qd_compose_insert_symbol(field, QD_MA_INGRESS);
+ qd_compose_insert_buffers(field, &msg->ma_ingress);
+ field_count++;
}
if (msg->ma_phase != 0) {
- qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
- qd_compose_insert_int(out_ma, msg->ma_phase);
+ qd_compose_insert_symbol(field, QD_MA_PHASE);
+ qd_compose_insert_int(field, msg->ma_phase);
+ field_count++;
}
}
}
+ if (msg->content->ma_count > 0) {
+ // insert the incoming message user blob
+ if (!map_started) {
+ qd_compose_start_map(out_ma);
+ map_started = true;
+ }
+
+ // Bump the map size and count to reflect user's blob.
+ // Note that the blob is not inserted here. This code adjusts the
+ // size/count of the map that is under construction and the content
+ // is inserted by router-node
+ qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
+ msg->content->field_user_annotations.length);
+ }
+
+ if (field_count > 0) {
+ if (!map_started) {
+ qd_compose_start_map(out_ma);
+ map_started = true;
+ }
+ qd_compose_insert_opaque_elements(out_ma, field_count * 2,
+ qd_buffer_list_length(&field->buffers));
+
+ }
+
if (map_started) {
qd_compose_end_map(out_ma);
qd_compose_take_buffers(out_ma, out);
+ qd_compose_take_buffers(field, out_trailer);
}
qd_compose_free(out_ma);
+ qd_compose_free(field);
+}
+
+
+// create a buffer chain holding the outgoing message annotations section
+static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out,
+ qd_buffer_list_t *out_trailer,
+ bool strip_annotations, bool is_interrouter)
+{
+ if (is_interrouter) {
+ compose_message_annotations_v1(msg, out, out_trailer, strip_annotations);
+ } else {
+ compose_message_annotations_v0(msg, out);
+ }
}
+
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations)
@@ -1154,28 +1269,16 @@ void qd_message_send(qd_message_t *in_msg,
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
unsigned char *cursor;
pn_link_t *pnl = qd_link_pn(link);
+ qd_connection_t *qdc = qd_link_connection(link);
+ bool is_rtr = qd_connection_is_interrouter(qdc);
qd_buffer_list_t new_ma;
+ qd_buffer_list_t new_ma_trailer;
DEQ_INIT(new_ma);
+ DEQ_INIT(new_ma_trailer);
// Process the message annotations if any
- compose_message_annotations(msg, &new_ma, strip_annotations);
-
- //
- // This is the case where the message annotations have been modified.
- // The message send must be divided into sections: The existing header;
- // the new message annotations; the rest of the existing message.
- // Note that the original message annotations that are still in the
- // buffer chain must not be sent.
- //
- // Start by making sure that we've parsed the message sections through
- // the message annotations
- //
- // ??? NO LONGER NECESSARY???
- if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
- qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
- return;
- }
+ compose_message_annotations(msg, &new_ma, &new_ma_trailer, strip_annotations, is_rtr);
//
// Send header if present
@@ -1201,7 +1304,7 @@ void qd_message_send(qd_message_t *in_msg,
}
//
- // Send new message annotations
+ // Send new message annotations map start if any
//
qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
while (da_buf) {
@@ -1212,6 +1315,29 @@ void qd_message_send(qd_message_t *in_msg,
qd_buffer_list_free_buffers(&new_ma);
//
+ // Annotations possibly include an opaque blob of user annotations
+ //
+ if (content->field_user_annotations.length > 0) {
+ qd_buffer_t *buf2 = content->field_user_annotations.buffer;
+ unsigned char *cursor2 = content->field_user_annotations.offset + qd_buffer_base(buf);
+ advance(&cursor2, &buf2,
+ content->field_user_annotations.length,
+ send_handler, (void*) pnl);
+ }
+
+ //
+ // Annotations may include the v1 new_ma_trailer
+ //
+ qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer);
+ while (ta_buf) {
+ char *to_send = (char*) qd_buffer_base(ta_buf);
+ pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
+ ta_buf = DEQ_NEXT(ta_buf);
+ }
+ qd_buffer_list_free_buffers(&new_ma_trailer);
+
+
+ //
// Skip over replaced message annotations
//
if (content->section_message_annotation.length > 0)
@@ -1472,9 +1598,13 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
qd_compose_end_list(field);
qd_buffer_list_t out_ma;
+ qd_buffer_list_t out_ma_trailer;
DEQ_INIT(out_ma);
- compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, false);
+ DEQ_INIT(out_ma_trailer);
+ compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, &out_ma_trailer, false, true);
qd_compose_insert_buffers(field, &out_ma);
+ // TODO: user annotation blob goes here
+ qd_compose_insert_buffers(field, &out_ma_trailer);
field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
qd_compose_start_list(field);
@@ -1530,3 +1660,32 @@ void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_com
}
}
+
+qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
+}
+
+
+qd_parsed_field_t *qd_message_get_phase (qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_pf_phase;
+}
+
+
+qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_pf_to_override;
+}
+
+
+qd_parsed_field_t *qd_message_get_trace (qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_pf_trace;
+}
+
+
+int qd_message_get_phase_val(qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index d59f7a9..c2dc7e2 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -75,6 +75,7 @@ typedef struct {
qd_field_location_t section_application_properties; // The application properties list
qd_field_location_t section_body; // The message body: Data
qd_field_location_t section_footer; // The footer
+ qd_field_location_t field_user_annotations; // Opaque user message annotations, not a real field.
qd_field_location_t field_message_id; // The string value of the message-id
qd_field_location_t field_user_id; // The string value of the user-id
qd_field_location_t field_to; // The string value of the to field
@@ -93,7 +94,19 @@ typedef struct {
qd_buffer_t *parse_buffer;
unsigned char *parse_cursor;
qd_message_depth_t parse_depth;
- qd_parsed_field_t *parsed_message_annotations;
+
+ bool ma_parsed; // have parsed annotations in incoming message
+ qd_iterator_t *ma_field_iter_in; // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION
+
+ qd_iterator_pointer_t ma_user_annotation_blob; // Original user annotations
+ // with router annotations stripped
+ uint32_t ma_count; // Number of map elements in blob
+ // after the router fields stripped
+ qd_parsed_field_t *ma_pf_ingress;
+ qd_parsed_field_t *ma_pf_phase;
+ qd_parsed_field_t *ma_pf_to_override;
+ qd_parsed_field_t *ma_pf_trace;
+ int ma_int_phase;
} qd_message_content_t;
typedef struct {
@@ -103,6 +116,7 @@ typedef struct {
qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
int ma_phase; // phase for the override address
+ bool is_interrouter; // true if peer is another router
} qd_message_pvt_t;
ALLOC_DECLARE(qd_message_t);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ae30831..a4146bd 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -93,49 +93,26 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
static qd_iterator_t *router_annotate_message(qd_router_t *router,
- qd_parsed_field_t *in_ma,
qd_message_t *msg,
qd_bitmask_t **link_exclusions,
bool strip_inbound_annotations)
{
qd_iterator_t *ingress_iter = 0;
- qd_parsed_field_t *trace = 0;
- qd_parsed_field_t *ingress = 0;
- qd_parsed_field_t *to = 0;
- qd_parsed_field_t *phase = 0;
+ bool s = strip_inbound_annotations;
- *link_exclusions = 0;
+ qd_parsed_field_t *trace = s ? 0 : qd_message_get_trace(msg);
+ qd_parsed_field_t *ingress = s ? 0 : qd_message_get_ingress(msg);
+ qd_parsed_field_t *to = s ? 0 : qd_message_get_to_override(msg);
+ qd_parsed_field_t *phase = s ? 0 : qd_message_get_phase(msg);
- if (in_ma && !strip_inbound_annotations) {
- uint32_t count = qd_parse_sub_count(in_ma);
- bool done = false;
-
- for (uint32_t idx = 0; idx < count && !done; idx++) {
- qd_parsed_field_t *sub = qd_parse_sub_key(in_ma, idx);
- if (!sub)
- continue;
- qd_iterator_t *iter = qd_parse_raw(sub);
- if (!iter)
- continue;
-
- if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TRACE)) {
- trace = qd_parse_sub_value(in_ma, idx);
- } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_INGRESS)) {
- ingress = qd_parse_sub_value(in_ma, idx);
- } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
- to = qd_parse_sub_value(in_ma, idx);
- } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
- phase = qd_parse_sub_value(in_ma, idx);
- }
- done = trace && ingress && to && phase;
- }
- }
+ *link_exclusions = 0;
//
// QD_MA_TRACE:
// If there is a trace field, append this router's ID to the trace.
// If the router ID is already in the trace the msg has looped.
+ // This code does not check for the loop condition.
//
qd_composed_field_t *trace_field = qd_compose_subfield(0);
qd_compose_start_list(trace_field);
@@ -182,8 +159,7 @@ static qd_iterator_t *router_annotate_message(qd_router_t *router,
// Preserve the existing value.
//
if (phase) {
- int phase_val = qd_parse_as_int(phase);
- qd_message_set_phase_annotation(msg, phase_val);
+ qd_message_set_phase_annotation(msg, qd_message_get_phase_val(msg));
}
//
@@ -265,8 +241,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
//
if (qdr_link_is_routed(rlink)) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
- delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd), (uint8_t*) dtag.start, dtag.size,
- pn_disposition_type(pn_delivery_remote(pnd)), pn_disposition_data(pn_delivery_remote(pnd)));
+ delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd),
+ (uint8_t*) dtag.start, dtag.size,
+ pn_disposition_type(pn_delivery_remote(pnd)),
+ pn_disposition_data(pn_delivery_remote(pnd)));
if (delivery) {
if (pn_delivery_settled(pnd))
@@ -308,6 +286,13 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
// 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be
// using the address from the link target.
//
+ // Validate the content of the delivery as an AMQP message. This is done partially, only
+ // to validate that we can find the fields we need to route the message.
+ //
+ // If the link is anonymous, we must validate through the message properties to find the
+ // 'to' field. If the link is not anonymous, we don't need the 'to' field as we will be
+ // using the address from the link target.
+ //
qd_message_depth_t validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
bool valid_message = qd_message_check(msg, validation_depth);
@@ -334,10 +319,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
}
}
- qd_parsed_field_t *in_ma = qd_message_message_annotations(msg);
+ qd_message_message_annotations(msg);
qd_bitmask_t *link_exclusions;
bool strip = qdr_link_strip_annotations_in(rlink);
- qd_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &link_exclusions, strip);
+
+ qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, strip);
if (anonymous_link) {
qd_iterator_t *addr_iter = 0;
@@ -346,12 +332,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
- if (in_ma) {
- qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
- if (ma_to) {
- addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
- phase = qd_message_get_phase_annotation(msg);
- }
+ qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
+ if (ma_to) {
+ addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
+ phase = qd_message_get_phase_val(msg);
}
//
@@ -1047,7 +1031,7 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
// handle any delivery-state on the transfer e.g. transactional-state
qdr_delivery_write_extension_state(dlv, pdlv, true);
//
- // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
+ // If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver.
//
bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org