You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:06 UTC

[qpid-dispatch] 20/32: Dataplane: Updates to the message-extend (return buffer count for flow control). Added bidirectional streaming test to ref adaptor.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 1463337dcc13a73b357fbfe15b5d8f0f8dba34a8
Author: Ted Ross <tr...@apache.org>
AuthorDate: Mon Jun 8 09:49:37 2020 -0400

    Dataplane: Updates to the message-extend (return buffer count for flow control). Added bidirectional streaming test to ref adaptor.
---
 include/qpid/dispatch/message.h                    |  8 ++-
 src/adaptors/reference_adaptor.c                   | 73 ++++++++++++++++------
 src/message.c                                      |  5 +-
 .../streaming_link_scrubber.c                      |  2 +-
 4 files changed, 67 insertions(+), 21 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 0f63b1a..1a7bce2 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -294,9 +294,15 @@ void qd_message_compose_5(qd_message_t        *msg,
                           bool                 complete);
 
 /**
+ * qd_message_extend
+ *
  * Extend the content of a streaming message with more buffers.
+ *
+ * @param msg Pointer to a message
+ * @param buffers A list of buffers to be appended to the end of the message's stream
+ * @return The number of buffers stored in the message's content
  */
-void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
+int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
 
 
 /** Put string representation of a message suitable for logging in buffer.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index f227542..7a65c17 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -37,6 +37,7 @@ typedef struct qdr_ref_adaptor_t {
     qdr_connection_t       *conn;
     qdr_link_t             *out_link_1;
     qdr_link_t             *out_link_2;
+    qdr_link_t             *in_link_2;
     qdr_link_t             *dynamic_in_link;
     char                   *reply_to;
     qd_message_t           *streaming_message;
@@ -123,6 +124,16 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
                                                     "ref.2",          //const char       *name,
                                                     0,                //const char       *terminus_addr,
                                                     &link_id);
+
+        source = qdr_terminus(0);
+        qdr_terminus_set_address(source, address2);
+        adaptor->in_link_2 = qdr_link_first_attach(adaptor->conn,
+                                                   QD_OUTGOING,
+                                                   source,           //qdr_terminus_t   *source,
+                                                   qdr_terminus(0),  //qdr_terminus_t   *target,
+                                                   "ref.3",          //const char       *name,
+                                                   0,                //const char       *terminus_addr,
+                                                   &link_id);
     }
 }
 
@@ -193,6 +204,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
         qd_message_compose_5(adaptor->streaming_message, props, 0, false);
         qd_compose_free(props);
 
+        printf("qdr_ref_flow: Starting a streaming delivery\n");
         adaptor->streaming_delivery =
             qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0);
         adaptor->stream_count = 0;
@@ -231,20 +243,34 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
     qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
     qd_message_t      *msg     = qdr_delivery_message(delivery);
 
-    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
-    if (status == QD_MESSAGE_DEPTH_OK) {
-        qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
-        char *body = (char*) qd_iterator_copy(body_iter);
-        printf("qdr_ref_deliver: message received, body=%s\n", body);
-        free(body);
-        qd_iterator_free(body_iter);
-        qd_message_set_send_complete(msg);
+    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES);
+
+    switch (status) {
+    case QD_MESSAGE_DEPTH_OK: {
+        if (qd_message_receive_complete(msg)) {
+            qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
+            char *body = (char*) qd_iterator_copy(body_iter);
+            printf("qdr_ref_deliver: complete message received, body=%s\n", body);
+            free(body);
+            qd_iterator_free(body_iter);
+            qd_message_set_send_complete(msg);
+            qdr_link_flow(adaptor->core, link, 1, false);
+            return PN_ACCEPTED; // This will cause the delivery to be settled
+        }
+        break;
     }
 
-    qdr_link_flow(adaptor->core, link, 1, false);
+    case QD_MESSAGE_DEPTH_INVALID:
+        printf("qdr_ref_deliver: message invalid\n");
+        qdr_link_flow(adaptor->core, link, 1, false);
+        break;
+
+    case QD_MESSAGE_DEPTH_INCOMPLETE:
+        printf("qdr_ref_deliver: message incomplete\n");
+        break;
+    }
 
-    return PN_ACCEPTED; // This will cause the delivery to be settled
+    return 0;
 }
 
 
@@ -269,7 +295,12 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
     }
     printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false");
 
-    if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1)
+    if (qdr_delivery_link(dlv) == adaptor->out_link_2 && qdr_delivery_message(dlv) == adaptor->streaming_message) {
+        adaptor->streaming_message = 0;
+        adaptor->stream_count      = 0;
+    }
+
+    if (settled)
         qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery");
 }
 
@@ -352,23 +383,30 @@ static void on_stream(void *context)
     qdr_ref_adaptor_t *adaptor        = (qdr_ref_adaptor_t*) context;
     const char        *content        = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
     const size_t       content_length = strlen(content);
-    qd_buffer_t       *buf            = qd_buffer();
-    qd_buffer_list_t   buffers;
+
+    if (!adaptor->streaming_message)
+        return;
+
+    qd_buffer_t      *buf = qd_buffer();
+    qd_buffer_list_t  buffers;
 
     DEQ_INIT(buffers);
     DEQ_INSERT_TAIL(buffers, buf);
 
     memcpy(qd_buffer_cursor(buf), content, content_length);
     qd_buffer_insert(buf, content_length);
-    qd_message_extend(adaptor->streaming_message, &buffers);
+    int depth = qd_message_extend(adaptor->streaming_message, &buffers);
     qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
 
-    if (adaptor->stream_count < 30) {
-        qd_timer_schedule(adaptor->stream_timer, 1000);
+    if (adaptor->stream_count < 10) {
+        qd_timer_schedule(adaptor->stream_timer, 100);
         adaptor->stream_count++;
+        printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth);
     } else {
         qd_message_set_receive_complete(adaptor->streaming_message);
         adaptor->streaming_message = 0;
+        adaptor->stream_count      = 0;
+        printf("on_stream: completed streaming send, depth=%d\n", depth);
     }
 }
 
@@ -404,7 +442,6 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
                                             qdr_ref_conn_trace);
     *adaptor_context = adaptor;
 
-    // TEMPORARY //
     adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor);
     qd_timer_schedule(adaptor->startup_timer, 0);
 
diff --git a/src/message.c b/src/message.c
index 8b833f3..97bcab2 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2218,10 +2218,11 @@ void qd_message_compose_5(qd_message_t        *msg,
 }
 
 
-void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
+int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
 {
     qd_message_content_t *content = MSG_CONTENT(msg);
     qd_buffer_t          *buf     = DEQ_HEAD(*buffers);
+    int                   count;
 
     LOCK(content->lock);
     while (buf) {
@@ -2230,7 +2231,9 @@ void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
     }
 
     DEQ_APPEND(content->buffers, (*buffers));
+    count = DEQ_SIZE(content->buffers);
     UNLOCK(content->lock);
+    return count;
 }
 
 
diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
index 1c578ca..126a09b 100644
--- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
+++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
@@ -184,4 +184,4 @@ static void qcm_streaming_link_scrubber_final_CT(void *module_context)
 }
 
 
-QDR_CORE_MODULE_DECLARE("streaming_link_scruber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)
+QDR_CORE_MODULE_DECLARE("streaming_link_scrubber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org