You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:16 UTC

[qpid-dispatch] 30/32: Dataplane: WIP

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 5bb50a1c6cd5571b81ab562ec6fe2360c46ce860
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Jul 17 14:04:01 2020 -0400

    Dataplane: WIP
---
 include/qpid/dispatch/message.h                    | 17 +------------
 src/adaptors/reference_adaptor.c                   | 22 +++++++----------
 src/message.c                                      | 28 ++++------------------
 src/message_private.h                              |  1 +
 src/python_embedded.c                              |  2 +-
 src/router_core/core_client_api.c                  |  2 +-
 .../modules/test_hooks/core_test_hooks.c           |  2 +-
 7 files changed, 18 insertions(+), 56 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 5a0b42a..f9e61d1 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -275,26 +275,11 @@ ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char
 
 // Convenience Functions
 void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers);
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
+void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool complete);
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
 void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
 
 /**
- * Send a message with optional headers and an optional raw body with the option of starting
- * a streaming transfer.
- *
- * @param msg The message being composed
- * @param headers A composed field with 1 or more header sections (incl body performative) or NULL
- * @param body A buffer list of raw body content or NULL
- * @param complete True if the message is to be receive-complete.
- *        False if more content will arrive later.
- */
-void qd_message_compose_5(qd_message_t        *msg,
-                          qd_composed_field_t *headers,
-                          qd_buffer_list_t    *body,
-                          bool                 complete);
-
-/**
  * qd_message_extend
  *
  * Extend the content of a streaming message with more buffers.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 2e1d98e..a0032a1 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -152,8 +152,6 @@ static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error,
 static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
 {
     qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
-    qd_buffer_list_t   buffers;
-    qd_buffer_t       *buf;
     
     printf("qdr_ref_flow: %d credits issued\n", credit);
 
@@ -177,15 +175,12 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
         */
         qd_compose_end_list(props);
 
+        props = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, props);
+        qd_compose_insert_string(props, "Test Payload");
+
         qd_message_t *msg = qd_message();
-        DEQ_INIT(buffers);
-        buf = qd_buffer();
-        char *insert = (char*) qd_buffer_cursor(buf);
-        memcpy(insert, "\x00\x53\x77\xa1\x0cTest Payload", 17);
-        qd_buffer_insert(buf, 17);
-        DEQ_INSERT_HEAD(buffers, buf);
-
-        qd_message_compose_5(msg, props, &buffers, true);
+
+        qd_message_compose_2(msg, props, true);
         qd_compose_free(props);
 
         qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0);
@@ -205,7 +200,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
 
         adaptor->streaming_message = qd_message();
 
-        qd_message_compose_5(adaptor->streaming_message, props, 0, false);
+        qd_message_compose_2(adaptor->streaming_message, props, false);
         qd_compose_free(props);
 
         printf("qdr_ref_flow: Starting a streaming delivery\n");
@@ -409,6 +404,7 @@ static void on_stream(void *context)
         qd_buffer_list_t buffer_list;
         DEQ_INIT(buffer_list);
         qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+        qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
 
         //
         // Compose a DATA performative for this section of the stream
@@ -423,7 +419,7 @@ static void on_stream(void *context)
         qd_compose_free(field);
 
         //
-        // Notify the router that more data has arrived on the delivery
+        // Notify the router that more data is ready to be pushed out on the delivery
         //
         qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
     }
@@ -493,4 +489,4 @@ void qdr_ref_adaptor_final(void *adaptor_context)
 /**
  * Declare the adaptor so that it will self-register on process startup.
  */
-//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
+QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
diff --git a/src/message.c b/src/message.c
index c00b909..2b758b0 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2224,14 +2224,14 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
 }
 
 
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field)
+void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool complete)
 {
     qd_message_content_t *content       = MSG_CONTENT(msg);
-    content->receive_complete     = true;
-
     qd_buffer_list_t     *field_buffers = qd_compose_buffers(field);
 
-    content->buffers = *field_buffers;
+    content->buffers          = *field_buffers;
+    content->receive_complete = complete;
+
     DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
 }
 
@@ -2264,26 +2264,6 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
 }
 
 
-void qd_message_compose_5(qd_message_t        *msg,
-                          qd_composed_field_t *headers,
-                          qd_buffer_list_t    *body,
-                          bool                 complete)
-{
-    qd_message_content_t *content         = MSG_CONTENT(msg);
-    qd_buffer_list_t     *headers_buffers = headers ? qd_compose_buffers(headers) : 0;
-
-    DEQ_INIT(content->buffers);
-    if (headers_buffers)
-        DEQ_APPEND(content->buffers, (*headers_buffers));
-
-    if (body) {
-        DEQ_APPEND(content->buffers, (*body));
-    }
-
-    content->receive_complete = complete;
-}
-
-
 int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
 {
     qd_message_content_t *content = MSG_CONTENT(msg);
diff --git a/src/message_private.h b/src/message_private.h
index 47f8f3e..d2b62f7 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -135,6 +135,7 @@ typedef struct {
     qd_buffer_list_t      ma_trace;        // trace list in outgoing message annotations
     qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
     int                   ma_phase;        // phase for the override address
+    qd_field_location_t   body_section;    // Location of the current parsed body section
     bool                  strip_annotations_in;
     bool                  send_complete;   // Has the message been completely received and completely sent?
     bool                  tag_sent;        // Tags are sent
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 7265eb2..9465929 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -764,7 +764,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
 
     if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
         qd_message_t *msg = qd_message();
-        qd_message_compose_2(msg, field);
+        qd_message_compose_2(msg, field, true);
 
         qd_composed_field_t *ingress = qd_compose_subfield(0);
         qd_compose_insert_string(ingress, qd_router_id(ioa->qd));
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 3b2b85c..5b52929 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -687,7 +687,7 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client,
     } else if (req->app_properties) {
         qd_message_compose_3(message, fld, req->app_properties);
     } else {
-        qd_message_compose_2(message, fld);
+        qd_message_compose_2(message, fld, true);
     }
     qd_compose_free(fld);
     qd_compose_free(req->body);
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 263b9d9..a810d35 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -108,7 +108,7 @@ static void source_send(test_endpoint_t *ep, bool presettled)
     qd_compose_insert_string(field, stringbuf);
 
     dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
-    qd_message_compose_2(msg, field);
+    qd_message_compose_2(msg, field, true);
     qd_compose_free(field);
     qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org