You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:00 UTC
[qpid-dispatch] 14/32: Dataplane: Added a 5th message compose
variant to provide: - optional properties - optional application-properties
- optional body in the form of a buffer list - indication of
receive-complete
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 02261342d835f8cf3a09e9e15121e43da5222ab8
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jun 3 18:02:25 2020 -0400
Dataplane: Added a 5th message compose variant to provide: - optional properties - optional application-properties - optional body in the form of a buffer list - indication of receive-complete
---
include/qpid/dispatch/message.h | 6 +++
src/adaptors/reference_adaptor.c | 80 ++++++++++++++++++++++++++++++----------
src/message.c | 27 ++++++++++++++
3 files changed, 93 insertions(+), 20 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 63f0b47..55c3dba 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -278,6 +278,12 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
+void qd_message_compose_5(qd_message_t *msg,
+ qd_composed_field_t *properties,
+ qd_composed_field_t *application_properties,
+ qd_buffer_list_t *body,
+ bool complete);
+
/** Put string representation of a message suitable for logging in buffer.
* @return buffer
*/
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index ea0ca6f..b6ea5b4 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -25,6 +25,8 @@
#include <stdio.h>
#include <inttypes.h>
+static char *address = "echo-service";
+
typedef struct qdr_ref_adaptor_t {
qdr_core_t *core;
qdr_protocol_adaptor_t *adaptor;
@@ -33,6 +35,7 @@ typedef struct qdr_ref_adaptor_t {
qdr_connection_t *conn;
qdr_link_t *out_link;
qdr_link_t *in_link;
+ char *reply_to;
} qdr_ref_adaptor_t;
@@ -61,6 +64,7 @@ static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link
static void qdr_ref_second_attach(void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target)
{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
#define TERM_SIZE 200
char ftarget[TERM_SIZE];
char fsource[TERM_SIZE];
@@ -79,6 +83,25 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
}
printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget);
+
+ if (link == adaptor->in_link) {
+ uint64_t link_id;
+ qdr_terminus_t *target = qdr_terminus(0);
+
+ qdr_terminus_set_address(target, address);
+
+ adaptor->out_link = qdr_link_first_attach(adaptor->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "ref.1", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+
+ qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
+ adaptor->reply_to = (char*) qd_iterator_copy(reply_iter);
+ printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to);
+ }
}
@@ -95,16 +118,40 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
printf("qdr_ref_flow: %d credits issued\n", credit);
- qd_message_t *msg = qd_message();
- DEQ_INIT(buffers);
- buf = qd_buffer();
- char *insert = (char*) qd_buffer_cursor(buf);
- strcpy(insert, "Test Payload");
- qd_buffer_insert(buf, 13);
- DEQ_INSERT_HEAD(buffers, buf);
- qd_message_compose_1(msg, "echo-service", &buffers);
-
- qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
+ if (link == adaptor->out_link) {
+ qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+ qd_compose_start_list(props);
+ qd_compose_insert_null(props); // message-id
+ qd_compose_insert_null(props); // user-id
+ qd_compose_insert_null(props); // to
+ qd_compose_insert_null(props); // subject
+ qd_compose_insert_string(props, adaptor->reply_to); // reply-to
+ /*
+ qd_compose_insert_null(props); // correlation-id
+ qd_compose_insert_null(props); // content-type
+ qd_compose_insert_null(props); // content-encoding
+ qd_compose_insert_timestamp(props, 0); // absolute-expiry-time
+ qd_compose_insert_timestamp(props, 0); // creation-time
+ qd_compose_insert_null(props); // group-id
+ qd_compose_insert_uint(props, 0); // group-sequence
+ qd_compose_insert_null(props); // reply-to-group-id
+ */
+ qd_compose_end_list(props);
+
+ qd_message_t *msg = qd_message();
+ DEQ_INIT(buffers);
+ buf = qd_buffer();
+ char *insert = (char*) qd_buffer_cursor(buf);
+ strcpy(insert, "Test Payload");
+ qd_buffer_insert(buf, 13);
+ DEQ_INSERT_HEAD(buffers, buf);
+
+ qd_message_compose_5(msg, props, 0, &buffers, true);
+ qd_compose_free(props);
+
+ qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
+ qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver");
+ }
}
@@ -208,16 +255,7 @@ static void on_startup(void *context)
uint64_t link_id;
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
- qdr_terminus_t *target = qdr_terminus(0);
- qdr_terminus_set_address(target, "echo-service");
-
- adaptor->out_link = qdr_link_first_attach(adaptor->conn,
- QD_INCOMING,
- qdr_terminus(0), //qdr_terminus_t *source,
- target, //qdr_terminus_t *target,
- "ref.1", //const char *name,
- 0, //const char *terminus_addr,
- &link_id);
+
adaptor->in_link = qdr_link_first_attach(adaptor->conn,
QD_OUTGOING,
dynamic_source, //qdr_terminus_t *source,
@@ -246,6 +284,7 @@ static void on_activate(void *context)
void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
{
qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t);
+ ZERO(adaptor);
adaptor->core = core;
adaptor->adaptor = qdr_protocol_adaptor(core,
"reference", // name
@@ -280,6 +319,7 @@ void qdr_ref_adaptor_final(void *adaptor_context)
qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
qd_timer_free(adaptor->startup_timer);
qd_timer_free(adaptor->activate_timer);
+ free(adaptor->reply_to);
free(adaptor);
}
diff --git a/src/message.c b/src/message.c
index 2ce6890..81ba863 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2179,6 +2179,33 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
}
+void qd_message_compose_5(qd_message_t *msg,
+ qd_composed_field_t *properties,
+ qd_composed_field_t *application_properties,
+ qd_buffer_list_t *body,
+ bool complete)
+{
+ qd_message_content_t *content = MSG_CONTENT(msg);
+ qd_buffer_list_t *properties_buffers = properties ? qd_compose_buffers(properties) : 0;
+ qd_buffer_list_t *application_properties_buffers = application_properties ? qd_compose_buffers(application_properties) : 0;
+
+ DEQ_INIT(content->buffers);
+ if (properties_buffers)
+ DEQ_APPEND(content->buffers, (*properties_buffers));
+ if (application_properties_buffers)
+ DEQ_APPEND(content->buffers, (*application_properties_buffers));
+
+ if (body) {
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(field, body);
+ DEQ_APPEND(content->buffers, (*qd_compose_buffers(field)));
+ qd_compose_free(field);
+ }
+
+ content->receive_complete = complete;
+}
+
+
qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg)
{
return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org