You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/13 15:35:53 UTC
qpid-dispatch git commit: DISPATCH-263 - Added a record with key
PN_DELIVERY_CTX to hold the message. Dont accept the disposition until the
full message has been received
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 150a05382 -> df9f69c12
DISPATCH-263 - Added a record with key PN_DELIVERY_CTX to hold the message. Dont accept the disposition until the full message has been received
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/df9f69c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/df9f69c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/df9f69c1
Branch: refs/heads/master
Commit: df9f69c1279ad22a6b5f30565d850028337e231a
Parents: 150a053
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Wed Apr 13 09:20:37 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed Apr 13 09:20:37 2016 -0400
----------------------------------------------------------------------
src/message.c | 13 ++++++++++---
src/message_private.h | 2 ++
src/router_node.c | 13 +++++++++++++
3 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 11e993c..ed248b3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -694,7 +694,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
pn_link_t *link = pn_delivery_link(delivery);
ssize_t rc;
qd_buffer_t *buf;
- qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_delivery_get_context(delivery);
+
+ pn_record_t *record = pn_delivery_attachments(delivery);
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
//
// If there is no message associated with the delivery, this is the first time
@@ -703,7 +705,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
- pn_delivery_set_context(delivery, (void*) msg);
+ pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
}
//
@@ -728,15 +730,20 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
//
if (rc == PN_EOS) {
//
+ // Clear the value in the record with key PN_DELIVERY_CTX
+ //
+ pn_record_set(record, PN_DELIVERY_CTX, 0);
+
+ //
// If the last buffer in the list is empty, remove it and free it. This
// will only happen if the size of the message content is an exact multiple
// of the buffer size.
//
+
if (qd_buffer_size(buf) == 0) {
DEQ_REMOVE_TAIL(msg->content->buffers);
qd_buffer_free(buf);
}
- pn_delivery_set_context(delivery, 0);
char repr[qd_message_repr_len()];
qd_log(log_source, QD_LOG_TRACE, "Received %s on link %s",
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 8ede2c7..139946d 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -104,6 +104,8 @@ ALLOC_DECLARE(qd_message_content_t);
/** Initialize logging */
void qd_message_initialize();
+PN_HANDLE(PN_DELIVERY_CTX)
+
///@}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ed1b51c..d36fc84 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -26,6 +26,7 @@
#include "dispatch_private.h"
#include "entity_cache.h"
#include "router_private.h"
+#include "message_private.h"
const char *QD_ROUTER_NODE_TYPE = "router.node";
const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
@@ -370,6 +371,18 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
if (!delivery)
return;
+ pn_record_t *record = pn_delivery_attachments(pnd);
+
+ //
+ // On the delivery, we set the message (qd_message_pvt_t) as a record with key PN_DELIVERY_CTX. When the
+ // complete delivery is received (rc == PN_EOS), we set the value on the PN_DELIVERY_CTX record to zero.
+ // If the PN_DELIVERY_CTX record is non-zero, it means that the message is still in-flight (complete message
+ // not received yet) in which case we will ignore the disposition.
+ //
+ if (pn_record_get(record, PN_DELIVERY_CTX) != 0) {
+ return;
+ }
+
//
// If the delivery is settled, remove the linkage between the PN and QDR deliveries.
//
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org