You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/13 15:35:53 UTC

qpid-dispatch git commit: DISPATCH-263 - Added a record with key PN_DELIVERY_CTX to hold the message. Dont accept the disposition until the full message has been received

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 150a05382 -> df9f69c12


DISPATCH-263 - Added a record with key PN_DELIVERY_CTX to hold the message. Dont accept the disposition until the full message has been received


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/df9f69c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/df9f69c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/df9f69c1

Branch: refs/heads/master
Commit: df9f69c1279ad22a6b5f30565d850028337e231a
Parents: 150a053
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Wed Apr 13 09:20:37 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed Apr 13 09:20:37 2016 -0400

----------------------------------------------------------------------
 src/message.c         | 13 ++++++++++---
 src/message_private.h |  2 ++
 src/router_node.c     | 13 +++++++++++++
 3 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 11e993c..ed248b3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -694,7 +694,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     pn_link_t        *link = pn_delivery_link(delivery);
     ssize_t           rc;
     qd_buffer_t      *buf;
-    qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_delivery_get_context(delivery);
+
+    pn_record_t *record    = pn_delivery_attachments(delivery);
+    qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
 
     //
     // If there is no message associated with the delivery, this is the first time
@@ -703,7 +705,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     //
     if (!msg) {
         msg = (qd_message_pvt_t*) qd_message();
-        pn_delivery_set_context(delivery, (void*) msg);
+        pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
     }
 
     //
@@ -728,15 +730,20 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         //
         if (rc == PN_EOS) {
             //
+            // Clear the value in the record with key PN_DELIVERY_CTX
+            //
+            pn_record_set(record, PN_DELIVERY_CTX, 0);
+
+            //
             // If the last buffer in the list is empty, remove it and free it.  This
             // will only happen if the size of the message content is an exact multiple
             // of the buffer size.
             //
+
             if (qd_buffer_size(buf) == 0) {
                 DEQ_REMOVE_TAIL(msg->content->buffers);
                 qd_buffer_free(buf);
             }
-            pn_delivery_set_context(delivery, 0);
 
             char repr[qd_message_repr_len()];
             qd_log(log_source, QD_LOG_TRACE, "Received %s on link %s",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 8ede2c7..139946d 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -104,6 +104,8 @@ ALLOC_DECLARE(qd_message_content_t);
 /** Initialize logging */
 void qd_message_initialize();
 
+PN_HANDLE(PN_DELIVERY_CTX)
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df9f69c1/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ed1b51c..d36fc84 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -26,6 +26,7 @@
 #include "dispatch_private.h"
 #include "entity_cache.h"
 #include "router_private.h"
+#include "message_private.h"
 
 const char *QD_ROUTER_NODE_TYPE = "router.node";
 const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
@@ -370,6 +371,18 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
     if (!delivery)
         return;
 
+    pn_record_t *record    = pn_delivery_attachments(pnd);
+
+    //
+    // On the delivery, we set the message (qd_message_pvt_t) as a record with key PN_DELIVERY_CTX. When the
+    // complete delivery is received (rc == PN_EOS), we set the value on the PN_DELIVERY_CTX record to zero.
+    // If the PN_DELIVERY_CTX record is non-zero, it means that the message is still in-flight (complete message
+    // not received yet) in which case we will ignore the disposition.
+    //
+    if (pn_record_get(record, PN_DELIVERY_CTX) != 0) {
+        return;
+    }
+
     //
     // If the delivery is settled, remove the linkage between the PN and QDR deliveries.
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org