You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:00 UTC

[qpid-dispatch] 14/32: Dataplane: Added a 5th message compose variant to provide: - optional properties - optional application-properties - optional body in the form of a buffer list - indication of receive-complete

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 02261342d835f8cf3a09e9e15121e43da5222ab8
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jun 3 18:02:25 2020 -0400

    Dataplane: Added a 5th message compose variant to provide: - optional properties - optional application-properties - optional body in the form of a buffer list - indication of receive-complete
---
 include/qpid/dispatch/message.h  |  6 +++
 src/adaptors/reference_adaptor.c | 80 ++++++++++++++++++++++++++++++----------
 src/message.c                    | 27 ++++++++++++++
 3 files changed, 93 insertions(+), 20 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 63f0b47..55c3dba 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -278,6 +278,12 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
 void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
 
+void qd_message_compose_5(qd_message_t        *msg,
+                          qd_composed_field_t *properties,
+                          qd_composed_field_t *application_properties,
+                          qd_buffer_list_t    *body,
+                          bool                 complete);
+
 /** Put string representation of a message suitable for logging in buffer.
  * @return buffer
  */
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index ea0ca6f..b6ea5b4 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -25,6 +25,8 @@
 #include <stdio.h>
 #include <inttypes.h>
 
+static char *address = "echo-service";
+
 typedef struct qdr_ref_adaptor_t {
     qdr_core_t             *core;
     qdr_protocol_adaptor_t *adaptor;
@@ -33,6 +35,7 @@ typedef struct qdr_ref_adaptor_t {
     qdr_connection_t       *conn;
     qdr_link_t             *out_link;
     qdr_link_t             *in_link;
+    char                   *reply_to;
 } qdr_ref_adaptor_t;
 
 
@@ -61,6 +64,7 @@ static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link
 static void qdr_ref_second_attach(void *context, qdr_link_t *link,
                                   qdr_terminus_t *source, qdr_terminus_t *target)
 {
+    qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
 #define TERM_SIZE 200
     char ftarget[TERM_SIZE];
     char fsource[TERM_SIZE];
@@ -79,6 +83,25 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
     }
 
     printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget);
+
+    if (link == adaptor->in_link) {
+        uint64_t        link_id;
+        qdr_terminus_t *target = qdr_terminus(0);
+
+        qdr_terminus_set_address(target, address);
+
+        adaptor->out_link = qdr_link_first_attach(adaptor->conn,
+                                                  QD_INCOMING,
+                                                  qdr_terminus(0),  //qdr_terminus_t   *source,
+                                                  target,           //qdr_terminus_t   *target,
+                                                  "ref.1",          //const char       *name,
+                                                  0,                //const char       *terminus_addr,
+                                                  &link_id);
+
+        qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
+        adaptor->reply_to = (char*) qd_iterator_copy(reply_iter);
+        printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to);
+    }
 }
 
 
@@ -95,16 +118,40 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
     
     printf("qdr_ref_flow: %d credits issued\n", credit);
 
-    qd_message_t *msg = qd_message();
-    DEQ_INIT(buffers);
-    buf = qd_buffer();
-    char *insert = (char*) qd_buffer_cursor(buf);
-    strcpy(insert, "Test Payload");
-    qd_buffer_insert(buf, 13);
-    DEQ_INSERT_HEAD(buffers, buf);
-    qd_message_compose_1(msg, "echo-service", &buffers);
-
-    qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
+    if (link == adaptor->out_link) {
+        qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+        qd_compose_start_list(props);
+        qd_compose_insert_null(props);                      // message-id
+        qd_compose_insert_null(props);                      // user-id
+        qd_compose_insert_null(props);                      // to
+        qd_compose_insert_null(props);                      // subject
+        qd_compose_insert_string(props, adaptor->reply_to); // reply-to
+        /*
+        qd_compose_insert_null(props);                      // correlation-id
+        qd_compose_insert_null(props);                      // content-type
+        qd_compose_insert_null(props);                      // content-encoding
+        qd_compose_insert_timestamp(props, 0);              // absolute-expiry-time
+        qd_compose_insert_timestamp(props, 0);              // creation-time
+        qd_compose_insert_null(props);                      // group-id
+        qd_compose_insert_uint(props, 0);                   // group-sequence
+        qd_compose_insert_null(props);                      // reply-to-group-id
+        */
+        qd_compose_end_list(props);
+
+        qd_message_t *msg = qd_message();
+        DEQ_INIT(buffers);
+        buf = qd_buffer();
+        char *insert = (char*) qd_buffer_cursor(buf);
+        strcpy(insert, "Test Payload");
+        qd_buffer_insert(buf, 13);
+        DEQ_INSERT_HEAD(buffers, buf);
+
+        qd_message_compose_5(msg, props, 0, &buffers, true);
+        qd_compose_free(props);
+
+        qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
+        qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver");
+    }
 }
 
 
@@ -208,16 +255,7 @@ static void on_startup(void *context)
     uint64_t link_id;
     qdr_terminus_t *dynamic_source = qdr_terminus(0);
     qdr_terminus_set_dynamic(dynamic_source);
-    qdr_terminus_t *target = qdr_terminus(0);
-    qdr_terminus_set_address(target, "echo-service");
-
-    adaptor->out_link = qdr_link_first_attach(adaptor->conn,
-                                              QD_INCOMING,
-                                              qdr_terminus(0),  //qdr_terminus_t   *source,
-                                              target,           //qdr_terminus_t   *target,
-                                              "ref.1",          //const char       *name,
-                                              0,                //const char       *terminus_addr,
-                                              &link_id);
+
     adaptor->in_link = qdr_link_first_attach(adaptor->conn,
                                              QD_OUTGOING,
                                              dynamic_source,   //qdr_terminus_t   *source,
@@ -246,6 +284,7 @@ static void on_activate(void *context)
 void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
 {
     qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t);
+    ZERO(adaptor);
     adaptor->core    = core;
     adaptor->adaptor = qdr_protocol_adaptor(core,
                                             "reference", // name
@@ -280,6 +319,7 @@ void qdr_ref_adaptor_final(void *adaptor_context)
     qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
     qd_timer_free(adaptor->startup_timer);
     qd_timer_free(adaptor->activate_timer);
+    free(adaptor->reply_to);
     free(adaptor);
 }
 
diff --git a/src/message.c b/src/message.c
index 2ce6890..81ba863 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2179,6 +2179,33 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
 }
 
 
+void qd_message_compose_5(qd_message_t        *msg,
+                          qd_composed_field_t *properties,
+                          qd_composed_field_t *application_properties,
+                          qd_buffer_list_t    *body,
+                          bool                 complete)
+{
+    qd_message_content_t *content                        = MSG_CONTENT(msg);
+    qd_buffer_list_t     *properties_buffers             = properties             ? qd_compose_buffers(properties)             : 0;
+    qd_buffer_list_t     *application_properties_buffers = application_properties ? qd_compose_buffers(application_properties) : 0;
+
+    DEQ_INIT(content->buffers);
+    if (properties_buffers)
+        DEQ_APPEND(content->buffers, (*properties_buffers));
+    if (application_properties_buffers)
+        DEQ_APPEND(content->buffers, (*application_properties_buffers));
+
+    if (body) {
+        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+        qd_compose_insert_binary_buffers(field, body);
+        DEQ_APPEND(content->buffers, (*qd_compose_buffers(field)));
+        qd_compose_free(field);
+    }
+
+    content->receive_complete = complete;
+}
+
+
 qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org