You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/10/18 14:43:25 UTC
[03/15] qpid-dispatch git commit: DISPATCH-829: handle aborted
messages
DISPATCH-829: handle aborted messages
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a1726fac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a1726fac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a1726fac
Branch: refs/heads/master
Commit: a1726fac6b8720476c66e908e4526a4f507f0952
Parents: 4be95cd
Author: Chuck Rolke <cr...@redhat.com>
Authored: Mon Oct 2 16:06:57 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 18 08:11:40 2017 -0400
----------------------------------------------------------------------
src/message.c | 14 +++++++++
src/router_node.c | 79 ++++++++++++++++++++++++++++++++++----------------
2 files changed, 68 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a1726fac/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index cf02ed2..9a261e1 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1410,6 +1410,17 @@ void qd_message_send(qd_message_t *in_msg,
if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
+ if (msg->content->aborted) {
+ // Message is aborted before any part of it has been sent.
+ // Declare the message to be sent,
+ msg->send_complete = true;
+ // the link has an outgoing deliver. abort it.
+ pn_delivery_abort(pn_link_current(pnl));
+
+ // TODO: Dispose of message buffers that may have accumulated
+ return;
+ }
+
qd_buffer_list_t new_ma;
qd_buffer_list_t new_ma_trailer;
DEQ_INIT(new_ma);
@@ -1560,6 +1571,9 @@ void qd_message_send(qd_message_t *in_msg,
msg->cursor.buffer = 0;
msg->cursor.cursor = 0;
+ if (msg->content->aborted) {
+ pn_delivery_abort(pn_link_current(pnl));
+ }
}
else {
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a1726fac/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ff0fa2c..4245189 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -216,18 +216,23 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
//
pn_link_advance(pn_link);
- // Since the entire message has been received, we can print out its contents to the log if necessary.
- if (cf->log_message) {
- char repr[qd_message_repr_len()];
- char* message_repr = qd_message_repr((qd_message_t*)msg,
- repr,
- sizeof(repr),
- cf->log_bits);
- if (message_repr) {
- qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s",
- pn_link_name(pn_link),
- message_repr);
+ if (!qd_message_aborted(msg)) {
+ // Since the entire message has been received, we can print out its contents to the log if necessary.
+ if (cf->log_message) {
+ char repr[qd_message_repr_len()];
+ char* message_repr = qd_message_repr((qd_message_t*)msg,
+ repr,
+ sizeof(repr),
+ cf->log_bits);
+ if (message_repr) {
+ qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s",
+ pn_link_name(pn_link),
+ message_repr);
+ }
}
+ } else {
+ qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link '%s' received aborted message",
+ pn_link_name(pn_link));
}
// Link stalling may have ignored some delivery events.
@@ -338,11 +343,13 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
if (delivery) {
qdr_deliver_continue(delivery);
-
- if (pn_delivery_settled(pnd) && receive_complete) {
- pn_delivery_settle(pnd);
- qdr_delivery_decref(router->router_core, delivery);
+ if (receive_complete) {
+ if (pn_delivery_settled(pnd) || pn_delivery_aborted(pnd)) {
+ pn_delivery_settle(pnd);
+ qdr_delivery_decref(router->router_core, delivery);
+ }
}
+
return;
}
@@ -531,9 +538,10 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
qdr_delivery_set_cleared_proton_ref(delivery, true);
//
- // Don't decref the delivery here. Rather, we will _give_ the reference to the core.
+ // Don't decref the delivery here. Rather, we will _give_ the reference to the core if the delivery is not aborted.
//
- give_reference = true;
+ if (!pn_delivery_aborted(pnd))
+ give_reference = true;
}
//
@@ -1297,16 +1305,37 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
bool send_complete = qdr_delivery_send_complete(dlv);
if (send_complete) {
- if (!settled && remote_snd_settled) {
- // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
- qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
- }
+ if (qd_message_aborted(msg_out)) {
+
+ // This message has been aborted.
+ // When a sender aborts a message the message is implicitly settled.
+ // Tell the core that the delivery has been rejected and settled.
+ qdr_delivery_update_disposition(router->router_core, dlv, PN_REJECTED, true, 0, 0, false);
+
+ // Aborted messages must be settled locally
+ // Settling does not produce any disposition to message sender.
+ if (pdlv) {
+ qdr_delivery_set_context(dlv, 0);
+ pn_delivery_set_context(pdlv, 0);
+ pn_link_advance(plink);
+ pn_delivery_settle(pdlv);
+ qdr_delivery_set_cleared_proton_ref(dlv, true);
+ qdr_delivery_decref(router->router_core, dlv);
+ }
- pn_link_advance(plink);
+ } else {
+ if (!settled && remote_snd_settled) {
+ // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
+ qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false);
+ }
+
+ pn_link_advance(plink);
+
+ if (settled || remote_snd_settled) {
+ if (pdlv)
+ pn_delivery_settle(pdlv);
+ }
- if (settled || remote_snd_settled) {
- if (pdlv)
- pn_delivery_settle(pdlv);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org