You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/03/04 17:42:31 UTC

[qpid-dispatch] 01/01: DISPATCH-1988: Added code to start routing a delivery only if there is some data in the content->buffers or in content->pending. If we never receive any data, the delivery is rejected

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch DISPATCH-1988
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit bbe93f40bae1019fad1acb52c9140c0dab64b77e
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Thu Mar 4 12:42:00 2021 -0500

    DISPATCH-1988: Added code to start routing a delivery only if there is some data in the content->buffers or in content->pending. If we never receive any data, the delivery is rejected
---
 include/qpid/dispatch/message.h |  8 ++++++++
 src/message.c                   | 20 ++++++++++++++++++--
 src/router_node.c               | 36 ++++++++++++++++++++++++++++++++++++
 3 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 07e144a..1558dde 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -248,6 +248,14 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery);
 qd_message_t * qd_get_message_context(pn_delivery_t *delivery);
 
 /**
+ * Returns true if there is at least one non-empty buffer at the head of the content->buffers list
+ * or if the content->pending buffer is non-empty.
+ *
+ * @param msg A pointer to a message.
+ */
+bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t   *msg);
+
+/**
  * Send the message outbound on an outgoing link.
  *
  * @param msg A pointer to a message to be sent.
diff --git a/src/message.c b/src/message.c
index 1679407..3508a99 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1255,8 +1255,6 @@ void qd_message_add_fanout(qd_message_t *in_msg,
     // DISPATCH-1590: content->buffers may not be set up yet if
     // content->pending is the first buffer and it is not yet full.
     if (!buf) {
-        // assumption: proton will never signal a readable delivery if there is
-        // no data at all.
         assert(content->pending && qd_buffer_size(content->pending) > 0);
         DEQ_INSERT_TAIL(content->buffers, content->pending);
         content->pending = 0;
@@ -1422,6 +1420,24 @@ qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
     return 0;
 }
 
+bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t   *msg)
+{
+    if (!msg)
+        return false;
+
+    if (MSG_CONTENT(msg)) {
+        if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) {
+            qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers);
+            if (buf && qd_buffer_size(buf) > 0)
+                return true;
+        }
+        if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0)
+            return true;
+    }
+
+    return false;
+}
+
 
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
diff --git a/src/router_node.c b/src/router_node.c
index 70d98f9..bccf6cf 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -451,6 +451,42 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     qd_message_t   *msg   = qd_message_receive(pnd);
     bool receive_complete = qd_message_receive_complete(msg);
 
+    //
+    // The very first time AMQP_rx_handler is called on a PN_DELIVERY event, it calls  qd_message_receive(). When qd_message_receive() returns, we check here if
+    // there are any data in the content buffers. If there is no content in the buffers, there is no reason to route the delivery. We will wait for some data
+    // in the buffers before we start to route the delivery.
+    // Notice that the if statement checks for the existence of a delivery (qdr_delivery_t). Existence of a delivery means that the delivery has been routed when
+    // there was data in the buffers (When a delivery has been routed successfully, the delivery (qdr_delivery_t) will be non null)
+    //
+    // The following if statement will deal with the following cases:-
+    // 1. We receive one empty transfer frame with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
+    //    In this case, there is no data at all in the message content buffers, we will reject the message when receive_complete=true. We will never route this
+    //    delivery, so core thread will not be involved
+    // 2. We receive 2 or more empty transfer frames with more=true followed by another empty transfer frame with (more=false and abort=true) or with just more=false
+    //    This case is similar to #1. We have no content in any of the buffers, we will reject this message after receive_complete=true. We will never route this
+    //    delivery, so core thread will not be involved
+    // 3. Exactly one empty transfer frame with more=false and abort=false
+    //    In this case, again there is still no content in any of the buffers, we will reject this message. Again, we will not route this message, so the core thread is not involved.
+    //
+    if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) {
+        if (receive_complete) {
+            // There is no qdr_delivery_t (delivery) yet which means this message has not been routed yet (the first run of this function is not complete yet) and
+            // the message is fully received (receive_complete=true) but there is no content in the message buffers.
+            // This is only possible if there were one or more empty transfer frames.
+            // Since there is nothing in the message, we will reject it (AMQP message must have a non empty message body)
+            pn_link_flow(pn_link, 1);
+            if (pn_delivery_aborted(pnd))
+                qd_message_set_discard(msg, true);
+            pn_delivery_update(pnd, PN_REJECTED);
+            pn_delivery_settle(pnd);
+            // qd_message_free will free all the associated content buffers and also the content->pending buffer
+            qd_message_free(msg);
+            qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to empty message");
+        }
+
+        return false;
+    }
+
     if (!qd_message_oversize(msg)) {
         // message not rejected as oversize
         if (receive_complete) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org