You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:16 UTC
[qpid-dispatch] 30/32: Dataplane: WIP
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 5bb50a1c6cd5571b81ab562ec6fe2360c46ce860
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Jul 17 14:04:01 2020 -0400
Dataplane: WIP
---
include/qpid/dispatch/message.h | 17 +------------
src/adaptors/reference_adaptor.c | 22 +++++++----------
src/message.c | 28 ++++------------------
src/message_private.h | 1 +
src/python_embedded.c | 2 +-
src/router_core/core_client_api.c | 2 +-
.../modules/test_hooks/core_test_hooks.c | 2 +-
7 files changed, 18 insertions(+), 56 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 5a0b42a..f9e61d1 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -275,26 +275,11 @@ ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char
// Convenience Functions
void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers);
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
+void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool complete);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
/**
- * Send a message with optional headers and an optional raw body with the option of starting
- * a streaming transfer.
- *
- * @param msg The message being composed
- * @param headers A composed field with 1 or more header sections (incl body performative) or NULL
- * @param body A buffer list of raw body content or NULL
- * @param complete True if the message is to be receive-complete.
- * False if more content will arrive later.
- */
-void qd_message_compose_5(qd_message_t *msg,
- qd_composed_field_t *headers,
- qd_buffer_list_t *body,
- bool complete);
-
-/**
* qd_message_extend
*
* Extend the content of a streaming message with more buffers.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 2e1d98e..a0032a1 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -152,8 +152,6 @@ static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error,
static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
{
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
- qd_buffer_list_t buffers;
- qd_buffer_t *buf;
printf("qdr_ref_flow: %d credits issued\n", credit);
@@ -177,15 +175,12 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
*/
qd_compose_end_list(props);
+ props = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, props);
+ qd_compose_insert_string(props, "Test Payload");
+
qd_message_t *msg = qd_message();
- DEQ_INIT(buffers);
- buf = qd_buffer();
- char *insert = (char*) qd_buffer_cursor(buf);
- memcpy(insert, "\x00\x53\x77\xa1\x0cTest Payload", 17);
- qd_buffer_insert(buf, 17);
- DEQ_INSERT_HEAD(buffers, buf);
-
- qd_message_compose_5(msg, props, &buffers, true);
+
+ qd_message_compose_2(msg, props, true);
qd_compose_free(props);
qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0);
@@ -205,7 +200,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
adaptor->streaming_message = qd_message();
- qd_message_compose_5(adaptor->streaming_message, props, 0, false);
+ qd_message_compose_2(adaptor->streaming_message, props, false);
qd_compose_free(props);
printf("qdr_ref_flow: Starting a streaming delivery\n");
@@ -409,6 +404,7 @@ static void on_stream(void *context)
qd_buffer_list_t buffer_list;
DEQ_INIT(buffer_list);
qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+ qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
//
// Compose a DATA performative for this section of the stream
@@ -423,7 +419,7 @@ static void on_stream(void *context)
qd_compose_free(field);
//
- // Notify the router that more data has arrived on the delivery
+ // Notify the router that more data is ready to be pushed out on the delivery
//
qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
}
@@ -493,4 +489,4 @@ void qdr_ref_adaptor_final(void *adaptor_context)
/**
* Declare the adaptor so that it will self-register on process startup.
*/
-//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
+QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final)
diff --git a/src/message.c b/src/message.c
index c00b909..2b758b0 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2224,14 +2224,14 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
}
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field)
+void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool complete)
{
qd_message_content_t *content = MSG_CONTENT(msg);
- content->receive_complete = true;
-
qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
- content->buffers = *field_buffers;
+ content->buffers = *field_buffers;
+ content->receive_complete = complete;
+
DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}
@@ -2264,26 +2264,6 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
}
-void qd_message_compose_5(qd_message_t *msg,
- qd_composed_field_t *headers,
- qd_buffer_list_t *body,
- bool complete)
-{
- qd_message_content_t *content = MSG_CONTENT(msg);
- qd_buffer_list_t *headers_buffers = headers ? qd_compose_buffers(headers) : 0;
-
- DEQ_INIT(content->buffers);
- if (headers_buffers)
- DEQ_APPEND(content->buffers, (*headers_buffers));
-
- if (body) {
- DEQ_APPEND(content->buffers, (*body));
- }
-
- content->receive_complete = complete;
-}
-
-
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
diff --git a/src/message_private.h b/src/message_private.h
index 47f8f3e..d2b62f7 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -135,6 +135,7 @@ typedef struct {
qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
int ma_phase; // phase for the override address
+ qd_field_location_t body_section; // Location of the current parsed body section
bool strip_annotations_in;
bool send_complete; // Has the message been completely received and completely sent?
bool tag_sent; // Tags are sent
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 7265eb2..9465929 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -764,7 +764,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
qd_message_t *msg = qd_message();
- qd_message_compose_2(msg, field);
+ qd_message_compose_2(msg, field, true);
qd_composed_field_t *ingress = qd_compose_subfield(0);
qd_compose_insert_string(ingress, qd_router_id(ioa->qd));
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 3b2b85c..5b52929 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -687,7 +687,7 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client,
} else if (req->app_properties) {
qd_message_compose_3(message, fld, req->app_properties);
} else {
- qd_message_compose_2(message, fld);
+ qd_message_compose_2(message, fld, true);
}
qd_compose_free(fld);
qd_compose_free(req->body);
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 263b9d9..a810d35 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -108,7 +108,7 @@ static void source_send(test_endpoint_t *ep, bool presettled)
qd_compose_insert_string(field, stringbuf);
dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
- qd_message_compose_2(msg, field);
+ qd_message_compose_2(msg, field, true);
qd_compose_free(field);
qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org