You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:04 UTC
[qpid-dispatch] 18/32: Dataplane: Added calls in message.h for
streaming putput from adaptors. Renamed qdr_deliver_continue* to
qdr_delivery_continue*
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit a307cf433ee35c1f6be53ab9eae0b57225b2125a
Author: Ted Ross <tr...@apache.org>
AuthorDate: Thu Jun 4 15:22:29 2020 -0400
Dataplane: Added calls in message.h for streaming putput from adaptors. Renamed qdr_deliver_continue* to qdr_delivery_continue*
---
include/qpid/dispatch/message.h | 22 ++++++
src/adaptors/reference_adaptor.c | 141 ++++++++++++++++++++++++++++++---------
src/message.c | 16 +++++
src/router_core/connections.c | 4 +-
src/router_core/delivery.c | 20 +++---
src/router_core/delivery.h | 4 +-
src/router_node.c | 2 +-
7 files changed, 164 insertions(+), 45 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 29b2335..0f63b1a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -278,11 +278,27 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
+/**
+ * Send a message with optional headers and an optional raw body with the option of starting
+ * a streaming transfer.
+ *
+ * @param msg The message being composed
+ * @param headers A composed field with 1 or more header sections (incl body performative) or NULL
+ * @param body A buffer list of raw body content or NULL
+ * @param complete True if the message is to be receive-complete.
+ * False if more content will arrive later.
+ */
void qd_message_compose_5(qd_message_t *msg,
qd_composed_field_t *headers,
qd_buffer_list_t *body,
bool complete);
+/**
+ * Extend the content of a streaming message with more buffers.
+ */
+void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
+
+
/** Put string representation of a message suitable for logging in buffer.
* @return buffer
*/
@@ -369,6 +385,12 @@ void qd_message_set_send_complete(qd_message_t *msg);
/**
+ * Flag the message as being receive-complete.
+ */
+void qd_message_set_receive_complete(qd_message_t *msg);
+
+
+/**
* Returns true if the delivery tag has already been sent.
*/
bool qd_message_tag_sent(qd_message_t *msg);
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index c7ca07c..f227542 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -25,17 +25,23 @@
#include <stdio.h>
#include <inttypes.h>
-static char *address = "examples";
+static char *address1 = "examples";
+static char *address2 = "stream";
typedef struct qdr_ref_adaptor_t {
qdr_core_t *core;
qdr_protocol_adaptor_t *adaptor;
qd_timer_t *startup_timer;
qd_timer_t *activate_timer;
+ qd_timer_t *stream_timer;
qdr_connection_t *conn;
- qdr_link_t *out_link;
- qdr_link_t *in_link;
+ qdr_link_t *out_link_1;
+ qdr_link_t *out_link_2;
+ qdr_link_t *dynamic_in_link;
char *reply_to;
+ qd_message_t *streaming_message;
+ qdr_delivery_t *streaming_delivery;
+ int stream_count;
} qdr_ref_adaptor_t;
@@ -84,25 +90,39 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget);
- if (link == adaptor->in_link) {
- uint64_t link_id;
- qdr_terminus_t *target = qdr_terminus(0);
-
- qdr_terminus_set_address(target, address);
-
- adaptor->out_link = qdr_link_first_attach(adaptor->conn,
- QD_INCOMING,
- qdr_terminus(0), //qdr_terminus_t *source,
- target, //qdr_terminus_t *target,
- "ref.1", //const char *name,
- 0, //const char *terminus_addr,
- &link_id);
-
- qdr_link_flow(adaptor->core, adaptor->out_link, 10, false);
-
+ if (link == adaptor->dynamic_in_link) {
+ //
+ // The dynamic in-link has been attached. Get the reply-to address and open
+ // a couple of out-links.
+ //
qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
adaptor->reply_to = (char*) qd_iterator_copy(reply_iter);
printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to);
+
+ //
+ // Open an out-link for each address
+ //
+ uint64_t link_id;
+ qdr_terminus_t *target = qdr_terminus(0);
+
+ qdr_terminus_set_address(target, address1);
+ adaptor->out_link_1 = qdr_link_first_attach(adaptor->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "ref.1", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
+
+ target = qdr_terminus(0);
+ qdr_terminus_set_address(target, address2);
+ adaptor->out_link_2 = qdr_link_first_attach(adaptor->conn,
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "ref.2", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
}
}
@@ -120,7 +140,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
printf("qdr_ref_flow: %d credits issued\n", credit);
- if (link == adaptor->out_link) {
+ if (link == adaptor->out_link_1) {
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(props);
qd_compose_insert_null(props); // message-id
@@ -151,8 +171,34 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
qd_message_compose_5(msg, props, &buffers, true);
qd_compose_free(props);
- qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
- qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver");
+ qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0);
+ // Keep return-protection delivery reference as the adaptor's reference
+ } else if (link == adaptor->out_link_2) {
+ //
+ // Begin streaming a long message on the link.
+ //
+ qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+ qd_compose_start_list(props);
+ qd_compose_insert_null(props); // message-id
+ qd_compose_insert_null(props); // user-id
+ qd_compose_insert_null(props); // to
+ qd_compose_insert_null(props); // subject
+ qd_compose_insert_string(props, adaptor->reply_to); // reply-to
+ qd_compose_end_list(props);
+
+ props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
+
+ adaptor->streaming_message = qd_message();
+
+ qd_message_compose_5(adaptor->streaming_message, props, 0, false);
+ qd_compose_free(props);
+
+ adaptor->streaming_delivery =
+ qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0);
+ adaptor->stream_count = 0;
+ // Keep return-protection delivery reference as the adaptor's reference
+
+ qd_timer_schedule(adaptor->stream_timer, 1000);
}
}
@@ -210,7 +256,8 @@ static int qdr_ref_get_credit(void *context, qdr_link_t *link)
static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
{
- char *dispname;
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+ char *dispname;
switch (disp) {
case PN_ACCEPTED: dispname = "ACCEPTED"; break;
@@ -221,6 +268,9 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
dispname = "<UNKNOWN>";
}
printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false");
+
+ if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1)
+ qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery");
}
@@ -272,16 +322,20 @@ static void on_startup(void *context)
0); // bind_token
uint64_t link_id;
+
+ //
+ // Create a dynamic receiver
+ //
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
- adaptor->in_link = qdr_link_first_attach(adaptor->conn,
- QD_OUTGOING,
- dynamic_source, //qdr_terminus_t *source,
- qdr_terminus(0), //qdr_terminus_t *target,
- "ref.2", //const char *name,
- 0, //const char *terminus_addr,
- &link_id);
+ adaptor->dynamic_in_link = qdr_link_first_attach(adaptor->conn,
+ QD_OUTGOING,
+ dynamic_source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "ref.0", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
}
@@ -293,6 +347,32 @@ static void on_activate(void *context)
}
+static void on_stream(void *context)
+{
+ qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+ const char *content = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ const size_t content_length = strlen(content);
+ qd_buffer_t *buf = qd_buffer();
+ qd_buffer_list_t buffers;
+
+ DEQ_INIT(buffers);
+ DEQ_INSERT_TAIL(buffers, buf);
+
+ memcpy(qd_buffer_cursor(buf), content, content_length);
+ qd_buffer_insert(buf, content_length);
+ qd_message_extend(adaptor->streaming_message, &buffers);
+ qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
+
+ if (adaptor->stream_count < 30) {
+ qd_timer_schedule(adaptor->stream_timer, 1000);
+ adaptor->stream_count++;
+ } else {
+ qd_message_set_receive_complete(adaptor->streaming_message);
+ adaptor->streaming_message = 0;
+ }
+}
+
+
/**
* This initialization function will be invoked when the router core is ready for the protocol
* adaptor to be created. This function must:
@@ -329,6 +409,7 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
qd_timer_schedule(adaptor->startup_timer, 0);
adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor);
+ adaptor->stream_timer = qd_timer(core->qd, on_stream, adaptor);
}
diff --git a/src/message.c b/src/message.c
index be0c7a8..87c559e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1232,6 +1232,15 @@ void qd_message_set_send_complete(qd_message_t *in_msg)
}
+void qd_message_set_receive_complete(qd_message_t *in_msg)
+{
+ if (!!in_msg) {
+ qd_message_content_t *content = MSG_CONTENT(in_msg);
+ content->receive_complete = true;
+ }
+}
+
+
bool qd_message_tag_sent(qd_message_t *in_msg)
{
if (!in_msg)
@@ -2209,6 +2218,13 @@ void qd_message_compose_5(qd_message_t *msg,
}
+void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
+{
+ qd_message_content_t *content = MSG_CONTENT(msg);
+ DEQ_APPEND(content->buffers, (*buffers));
+}
+
+
qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg)
{
return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index d6c7e40..88c4737 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -824,7 +824,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv, true);
- qdr_deliver_continue_peers_CT(core, dlv, false);
+ qdr_delivery_continue_peers_CT(core, dlv, false);
}
if (dlv->multicast) {
@@ -873,7 +873,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv, true);
- qdr_deliver_continue_peers_CT(core, dlv, false);
+ qdr_delivery_continue_peers_CT(core, dlv, false);
}
peer = qdr_delivery_first_peer_CT(dlv);
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 662ac2d..412683d 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -25,7 +25,7 @@ ALLOC_DEFINE(qdr_delivery_t);
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
qdr_delivery_t *peer, uint64_t new_disp, bool settled,
@@ -211,10 +211,10 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
}
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled)
+qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled)
{
- qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
+ qdr_action_t *action = qdr_action(qdr_delivery_continue_CT, "delivery_continue");
action->args.delivery.delivery = in_dlv;
qd_message_t *msg = qdr_delivery_message(in_dlv);
@@ -222,7 +222,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bo
action->args.delivery.presettled = settled;
// This incref is for the action reference
- qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
+ qdr_delivery_incref(in_dlv, "qdr_delivery_continue - add to action list");
qdr_action_enqueue(core, action);
return in_dlv;
}
@@ -1050,7 +1050,7 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
+void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
{
qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
@@ -1100,7 +1100,7 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, boo
}
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
return;
@@ -1123,7 +1123,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
// If it is already in the undelivered list, don't try to deliver this again.
//
if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- qdr_deliver_continue_peers_CT(core, in_dlv, more);
+ qdr_delivery_continue_peers_CT(core, in_dlv, more);
qd_message_t *msg = qdr_delivery_message(in_dlv);
@@ -1147,7 +1147,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
// We dont want to deal with such deliveries.
//
if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) {
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1");
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 1");
return;
}
@@ -1179,13 +1179,13 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
DEQ_REMOVE(link->settled, in_dlv);
// expect: action holds a ref to in_dlv, so it should not be freed here
assert(sys_atomic_get(&in_dlv->ref_count) > 1);
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from settled list");
}
}
}
// This decref is for the action reference
- qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2");
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 2");
}
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index ce8fd60..7e50efb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -124,7 +124,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
/* invoked when incoming message data arrives - schedule core thread */
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled);
+qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled);
//
@@ -153,7 +153,7 @@ qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
/* schedules all peer deliveries with work for I/O processing */
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
+void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
/* update the links counters with respect to its delivery */
void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
diff --git a/src/router_node.c b/src/router_node.c
index 95f9619..bd60542 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -436,7 +436,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
//
if (delivery) {
- qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd));
+ qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
return next_delivery;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org