You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:04 UTC

[qpid-dispatch] 18/32: Dataplane: Added calls in message.h for streaming putput from adaptors. Renamed qdr_deliver_continue* to qdr_delivery_continue*

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit a307cf433ee35c1f6be53ab9eae0b57225b2125a
Author: Ted Ross <tr...@apache.org>
AuthorDate: Thu Jun 4 15:22:29 2020 -0400

    Dataplane: Added calls in message.h for streaming putput from adaptors. Renamed qdr_deliver_continue* to qdr_delivery_continue*
---
 include/qpid/dispatch/message.h  |  22 ++++++
 src/adaptors/reference_adaptor.c | 141 ++++++++++++++++++++++++++++++---------
 src/message.c                    |  16 +++++
 src/router_core/connections.c    |   4 +-
 src/router_core/delivery.c       |  20 +++---
 src/router_core/delivery.h       |   4 +-
 src/router_node.c                |   2 +-
 7 files changed, 164 insertions(+), 45 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 29b2335..0f63b1a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -278,11 +278,27 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2);
 void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3);
 
+/**
+ * Send a message with optional headers and an optional raw body with the option of starting
+ * a streaming transfer.
+ *
+ * @param msg The message being composed
+ * @param headers A composed field with 1 or more header sections (incl body performative) or NULL
+ * @param body A buffer list of raw body content or NULL
+ * @param complete True if the message is to be receive-complete.
+ *        False if more content will arrive later.
+ */
 void qd_message_compose_5(qd_message_t        *msg,
                           qd_composed_field_t *headers,
                           qd_buffer_list_t    *body,
                           bool                 complete);
 
+/**
+ * Extend the content of a streaming message with more buffers.
+ */
+void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
+
+
 /** Put string representation of a message suitable for logging in buffer.
  * @return buffer
  */
@@ -369,6 +385,12 @@ void qd_message_set_send_complete(qd_message_t *msg);
 
 
 /**
+ * Flag the message as being receive-complete.
+ */
+void qd_message_set_receive_complete(qd_message_t *msg);
+
+
+/**
  * Returns true if the delivery tag has already been sent.
  */
 bool qd_message_tag_sent(qd_message_t *msg);
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index c7ca07c..f227542 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -25,17 +25,23 @@
 #include <stdio.h>
 #include <inttypes.h>
 
-static char *address = "examples";
+static char *address1 = "examples";
+static char *address2 = "stream";
 
 typedef struct qdr_ref_adaptor_t {
     qdr_core_t             *core;
     qdr_protocol_adaptor_t *adaptor;
     qd_timer_t             *startup_timer;
     qd_timer_t             *activate_timer;
+    qd_timer_t             *stream_timer;
     qdr_connection_t       *conn;
-    qdr_link_t             *out_link;
-    qdr_link_t             *in_link;
+    qdr_link_t             *out_link_1;
+    qdr_link_t             *out_link_2;
+    qdr_link_t             *dynamic_in_link;
     char                   *reply_to;
+    qd_message_t           *streaming_message;
+    qdr_delivery_t         *streaming_delivery;
+    int                     stream_count;
 } qdr_ref_adaptor_t;
 
 
@@ -84,25 +90,39 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
 
     printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget);
 
-    if (link == adaptor->in_link) {
-        uint64_t        link_id;
-        qdr_terminus_t *target = qdr_terminus(0);
-
-        qdr_terminus_set_address(target, address);
-
-        adaptor->out_link = qdr_link_first_attach(adaptor->conn,
-                                                  QD_INCOMING,
-                                                  qdr_terminus(0),  //qdr_terminus_t   *source,
-                                                  target,           //qdr_terminus_t   *target,
-                                                  "ref.1",          //const char       *name,
-                                                  0,                //const char       *terminus_addr,
-                                                  &link_id);
-
-        qdr_link_flow(adaptor->core, adaptor->out_link, 10, false);
-
+    if (link == adaptor->dynamic_in_link) {
+        //
+        // The dynamic in-link has been attached.  Get the reply-to address and open
+        // a couple of out-links.
+        //
         qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
         adaptor->reply_to = (char*) qd_iterator_copy(reply_iter);
         printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to);
+
+        //
+        // Open an out-link for each address
+        //
+        uint64_t        link_id;
+        qdr_terminus_t *target = qdr_terminus(0);
+
+        qdr_terminus_set_address(target, address1);
+        adaptor->out_link_1 = qdr_link_first_attach(adaptor->conn,
+                                                    QD_INCOMING,
+                                                    qdr_terminus(0),  //qdr_terminus_t   *source,
+                                                    target,           //qdr_terminus_t   *target,
+                                                    "ref.1",          //const char       *name,
+                                                    0,                //const char       *terminus_addr,
+                                                    &link_id);
+
+        target = qdr_terminus(0);
+        qdr_terminus_set_address(target, address2);
+        adaptor->out_link_2 = qdr_link_first_attach(adaptor->conn,
+                                                    QD_INCOMING,
+                                                    qdr_terminus(0),  //qdr_terminus_t   *source,
+                                                    target,           //qdr_terminus_t   *target,
+                                                    "ref.2",          //const char       *name,
+                                                    0,                //const char       *terminus_addr,
+                                                    &link_id);
     }
 }
 
@@ -120,7 +140,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
     
     printf("qdr_ref_flow: %d credits issued\n", credit);
 
-    if (link == adaptor->out_link) {
+    if (link == adaptor->out_link_1) {
         qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
         qd_compose_start_list(props);
         qd_compose_insert_null(props);                      // message-id
@@ -151,8 +171,34 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
         qd_message_compose_5(msg, props, &buffers, true);
         qd_compose_free(props);
 
-        qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0);
-        qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver");
+        qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0);
+        // Keep return-protection delivery reference as the adaptor's reference
+    } else if (link == adaptor->out_link_2) {
+        //
+        // Begin streaming a long message on the link.
+        //
+        qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+        qd_compose_start_list(props);
+        qd_compose_insert_null(props);                      // message-id
+        qd_compose_insert_null(props);                      // user-id
+        qd_compose_insert_null(props);                      // to
+        qd_compose_insert_null(props);                      // subject
+        qd_compose_insert_string(props, adaptor->reply_to); // reply-to
+        qd_compose_end_list(props);
+
+        props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
+
+        adaptor->streaming_message = qd_message();
+
+        qd_message_compose_5(adaptor->streaming_message, props, 0, false);
+        qd_compose_free(props);
+
+        adaptor->streaming_delivery =
+            qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0);
+        adaptor->stream_count = 0;
+        // Keep return-protection delivery reference as the adaptor's reference
+
+        qd_timer_schedule(adaptor->stream_timer, 1000);
     }
 }
 
@@ -210,7 +256,8 @@ static int qdr_ref_get_credit(void *context, qdr_link_t *link)
 
 static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
 {
-    char *dispname;
+    qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+    char              *dispname;
 
     switch (disp) {
     case PN_ACCEPTED: dispname = "ACCEPTED"; break;
@@ -221,6 +268,9 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
         dispname = "<UNKNOWN>";
     }
     printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false");
+
+    if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1)
+        qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery");
 }
 
 
@@ -272,16 +322,20 @@ static void on_startup(void *context)
                                           0);               // bind_token
 
     uint64_t link_id;
+
+    //
+    // Create a dynamic receiver
+    //
     qdr_terminus_t *dynamic_source = qdr_terminus(0);
     qdr_terminus_set_dynamic(dynamic_source);
 
-    adaptor->in_link = qdr_link_first_attach(adaptor->conn,
-                                             QD_OUTGOING,
-                                             dynamic_source,   //qdr_terminus_t   *source,
-                                             qdr_terminus(0),  //qdr_terminus_t   *target,
-                                             "ref.2",          //const char       *name,
-                                             0,                //const char       *terminus_addr,
-                                             &link_id);
+    adaptor->dynamic_in_link = qdr_link_first_attach(adaptor->conn,
+                                                     QD_OUTGOING,
+                                                     dynamic_source,   //qdr_terminus_t   *source,
+                                                     qdr_terminus(0),  //qdr_terminus_t   *target,
+                                                     "ref.0",          //const char       *name,
+                                                     0,                //const char       *terminus_addr,
+                                                     &link_id);
 }
 
 
@@ -293,6 +347,32 @@ static void on_activate(void *context)
 }
 
 
+static void on_stream(void *context)
+{
+    qdr_ref_adaptor_t *adaptor        = (qdr_ref_adaptor_t*) context;
+    const char        *content        = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+    const size_t       content_length = strlen(content);
+    qd_buffer_t       *buf            = qd_buffer();
+    qd_buffer_list_t   buffers;
+
+    DEQ_INIT(buffers);
+    DEQ_INSERT_TAIL(buffers, buf);
+
+    memcpy(qd_buffer_cursor(buf), content, content_length);
+    qd_buffer_insert(buf, content_length);
+    qd_message_extend(adaptor->streaming_message, &buffers);
+    qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
+
+    if (adaptor->stream_count < 30) {
+        qd_timer_schedule(adaptor->stream_timer, 1000);
+        adaptor->stream_count++;
+    } else {
+        qd_message_set_receive_complete(adaptor->streaming_message);
+        adaptor->streaming_message = 0;
+    }
+}
+
+
 /**
  * This initialization function will be invoked when the router core is ready for the protocol
  * adaptor to be created.  This function must:
@@ -329,6 +409,7 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
     qd_timer_schedule(adaptor->startup_timer, 0);
 
     adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor);
+    adaptor->stream_timer   = qd_timer(core->qd, on_stream, adaptor);
 }
 
 
diff --git a/src/message.c b/src/message.c
index be0c7a8..87c559e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1232,6 +1232,15 @@ void qd_message_set_send_complete(qd_message_t *in_msg)
 }
 
 
+void qd_message_set_receive_complete(qd_message_t *in_msg)
+{
+    if (!!in_msg) {
+        qd_message_content_t *content = MSG_CONTENT(in_msg);
+        content->receive_complete = true;
+    }
+}
+
+
 bool qd_message_tag_sent(qd_message_t *in_msg)
 {
     if (!in_msg)
@@ -2209,6 +2218,13 @@ void qd_message_compose_5(qd_message_t        *msg,
 }
 
 
+void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
+{
+    qd_message_content_t *content = MSG_CONTENT(msg);
+    DEQ_APPEND(content->buffers, (*buffers));
+}
+
+
 qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index d6c7e40..88c4737 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -824,7 +824,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
 
         if (!qdr_delivery_receive_complete(dlv)) {
             qdr_delivery_set_aborted(dlv, true);
-            qdr_deliver_continue_peers_CT(core, dlv, false);
+            qdr_delivery_continue_peers_CT(core, dlv, false);
         }
 
         if (dlv->multicast) {
@@ -873,7 +873,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
 
         if (!qdr_delivery_receive_complete(dlv)) {
             qdr_delivery_set_aborted(dlv, true);
-            qdr_deliver_continue_peers_CT(core, dlv, false);
+            qdr_delivery_continue_peers_CT(core, dlv, false);
         }
 
         peer = qdr_delivery_first_peer_CT(dlv);
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 662ac2d..412683d 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -25,7 +25,7 @@ ALLOC_DEFINE(qdr_delivery_t);
 
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
                                          qdr_delivery_t *peer, uint64_t new_disp, bool settled,
@@ -211,10 +211,10 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
 }
 
 
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled)
+qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled)
 {
 
-    qdr_action_t   *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
+    qdr_action_t   *action = qdr_action(qdr_delivery_continue_CT, "delivery_continue");
     action->args.delivery.delivery = in_dlv;
 
     qd_message_t *msg = qdr_delivery_message(in_dlv);
@@ -222,7 +222,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bo
     action->args.delivery.presettled = settled;
 
     // This incref is for the action reference
-    qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
+    qdr_delivery_incref(in_dlv, "qdr_delivery_continue - add to action list");
     qdr_action_enqueue(core, action);
     return in_dlv;
 }
@@ -1050,7 +1050,7 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
 }
 
 
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
+void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
 {
     qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
 
@@ -1100,7 +1100,7 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, boo
 }
 
 
-static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
         return;
@@ -1123,7 +1123,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
     // If it is already in the undelivered list, don't try to deliver this again.
     //
     if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
-        qdr_deliver_continue_peers_CT(core, in_dlv, more);
+        qdr_delivery_continue_peers_CT(core, in_dlv, more);
 
         qd_message_t *msg = qdr_delivery_message(in_dlv);
 
@@ -1147,7 +1147,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
                 // We dont want to deal with such deliveries.
                 //
                 if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) {
-                    qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1");
+                    qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 1");
                     return;
                 }
 
@@ -1179,13 +1179,13 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
                 DEQ_REMOVE(link->settled, in_dlv);
                 // expect: action holds a ref to in_dlv, so it should not be freed here
                 assert(sys_atomic_get(&in_dlv->ref_count) > 1);
-                qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list");
+                qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from settled list");
             }
         }
     }
 
     // This decref is for the action reference
-    qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2");
+    qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 2");
 }
 
 
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index ce8fd60..7e50efb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -124,7 +124,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
                                        bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
 
 /* invoked when incoming message data arrives - schedule core thread */
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled);
+qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled);
 
 
 //
@@ -153,7 +153,7 @@ qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
 qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
 
 /* schedules all peer deliveries with work for I/O processing */
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
+void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
 
 /* update the links counters with respect to its delivery */
 void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
diff --git a/src/router_node.c b/src/router_node.c
index 95f9619..bd60542 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -436,7 +436,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     //
 
     if (delivery) {
-        qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd));
+        qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
         return next_delivery;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org