You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/10/18 14:43:25 UTC

[03/15] qpid-dispatch git commit: DISPATCH-829: handle aborted messages

DISPATCH-829: handle aborted messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a1726fac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a1726fac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a1726fac

Branch: refs/heads/master
Commit: a1726fac6b8720476c66e908e4526a4f507f0952
Parents: 4be95cd
Author: Chuck Rolke <cr...@redhat.com>
Authored: Mon Oct 2 16:06:57 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 18 08:11:40 2017 -0400

----------------------------------------------------------------------
 src/message.c     | 14 +++++++++
 src/router_node.c | 79 ++++++++++++++++++++++++++++++++++----------------
 2 files changed, 68 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a1726fac/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index cf02ed2..9a261e1 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1410,6 +1410,17 @@ void qd_message_send(qd_message_t *in_msg,
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
+        if (msg->content->aborted) {
+            // Message is aborted before any part of it has been sent.
+            // Declare the message to be sent,
+            msg->send_complete = true;
+            // the link has an outgoing deliver. abort it.
+            pn_delivery_abort(pn_link_current(pnl));
+
+            // TODO: Dispose of message buffers that may have accumulated
+            return;
+        }
+
         qd_buffer_list_t new_ma;
         qd_buffer_list_t new_ma_trailer;
         DEQ_INIT(new_ma);
@@ -1560,6 +1571,9 @@ void qd_message_send(qd_message_t *in_msg,
                 msg->cursor.buffer = 0;
                 msg->cursor.cursor = 0;
 
+                if (msg->content->aborted) {
+                    pn_delivery_abort(pn_link_current(pnl));
+                }
             }
             else {
                 //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a1726fac/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ff0fa2c..4245189 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -216,18 +216,23 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
         //
         pn_link_advance(pn_link);
 
-        // Since the entire message has been received, we can print out its contents to the log if necessary.
-        if (cf->log_message) {
-            char repr[qd_message_repr_len()];
-            char* message_repr = qd_message_repr((qd_message_t*)msg,
-                                                 repr,
-                                                 sizeof(repr),
-                                                 cf->log_bits);
-            if (message_repr) {
-                qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s",
-                       pn_link_name(pn_link),
-                       message_repr);
+        if (!qd_message_aborted(msg)) {
+            // Since the entire message has been received, we can print out its contents to the log if necessary.
+            if (cf->log_message) {
+                char repr[qd_message_repr_len()];
+                char* message_repr = qd_message_repr((qd_message_t*)msg,
+                                                    repr,
+                                                    sizeof(repr),
+                                                    cf->log_bits);
+                if (message_repr) {
+                    qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s",
+                        pn_link_name(pn_link),
+                        message_repr);
+                }
             }
+        } else {
+            qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link '%s' received aborted message",
+                       pn_link_name(pn_link));
         }
 
         // Link stalling may have ignored some delivery events.
@@ -338,11 +343,13 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
 
     if (delivery) {
         qdr_deliver_continue(delivery);
-
-        if (pn_delivery_settled(pnd) && receive_complete) {
-            pn_delivery_settle(pnd);
-            qdr_delivery_decref(router->router_core, delivery);
+        if (receive_complete) {
+          if (pn_delivery_settled(pnd) || pn_delivery_aborted(pnd)) {
+              pn_delivery_settle(pnd);
+              qdr_delivery_decref(router->router_core, delivery);
+          }
         }
+
         return;
     }
 
@@ -531,9 +538,10 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
         qdr_delivery_set_cleared_proton_ref(delivery, true);
 
         //
-        // Don't decref the delivery here.  Rather, we will _give_ the reference to the core.
+        // Don't decref the delivery here.  Rather, we will _give_ the reference to the core if the delivery is not aborted.
         //
-        give_reference = true;
+        if (!pn_delivery_aborted(pnd))
+            give_reference = true;
     }
 
     //
@@ -1297,16 +1305,37 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
     bool send_complete = qdr_delivery_send_complete(dlv);
 
     if (send_complete) {
-        if (!settled && remote_snd_settled) {
-            // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
-            qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
-        }
+        if (qd_message_aborted(msg_out)) {
+
+            // This message has been aborted.
+            // When a sender aborts a message the message is implicitly settled.
+            // Tell the core that the delivery has been rejected and settled.
+            qdr_delivery_update_disposition(router->router_core, dlv, PN_REJECTED, true, 0, 0, false);
+
+            // Aborted messages must be settled locally
+            // Settling does not produce any disposition to message sender.
+            if (pdlv) {
+                qdr_delivery_set_context(dlv, 0);
+                pn_delivery_set_context(pdlv, 0);
+                pn_link_advance(plink);
+                pn_delivery_settle(pdlv);
+                qdr_delivery_set_cleared_proton_ref(dlv, true);
+                qdr_delivery_decref(router->router_core, dlv);
+            }
 
-        pn_link_advance(plink);
+        } else {
+            if (!settled && remote_snd_settled) {
+                // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
+                qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
+            }
+
+            pn_link_advance(plink);
+
+            if (settled || remote_snd_settled) {
+                if (pdlv)
+                    pn_delivery_settle(pdlv);
+            }
 
-        if (settled || remote_snd_settled) {
-            if (pdlv)
-                pn_delivery_settle(pdlv);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org