You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/04/01 17:57:25 UTC

qpid-dispatch git commit: DISPATCH-235 - Added code so the router respects the receiver's send settle mode

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master f3561a66c -> 20447d578


DISPATCH-235 - Added code so the router respects the receiver's send settle mode


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/20447d57
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/20447d57
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/20447d57

Branch: refs/heads/master
Commit: 20447d5782ea12281b8e366d7d715f58fba2ab21
Parents: f3561a6
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Apr 1 11:42:30 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Apr 1 11:42:30 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/container.h |  1 +
 src/container.c                   | 28 ++++++++++++-------
 src/router_node.c                 | 20 ++++++++++---
 tests/system_tests_one_router.py  | 51 ++++++++++++++++++++++++++++++++--
 4 files changed, 84 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 22c6870..0dcbb95 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -171,6 +171,7 @@ void *qd_link_get_conn_context(qd_link_t *link);
 
 void policy_notify_opened(void *container, qd_connection_t *conn, void *context);
 qd_direction_t qd_link_direction(const qd_link_t *link);
+pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link);
 qd_connection_t *qd_link_connection(qd_link_t *link);
 pn_link_t *qd_link_pn(qd_link_t *link);
 pn_session_t *qd_link_pn_session(qd_link_t *link);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 4d2f513..c97b5a4 100644
--- a/src/container.c
+++ b/src/container.c
@@ -50,16 +50,16 @@ DEQ_DECLARE(qd_node_t, qd_node_list_t);
 ALLOC_DECLARE(qd_node_t);
 ALLOC_DEFINE(qd_node_t);
 
-
 /** Encapsulates a proton link for sending and receiving messages */
 struct qd_link_t {
-    pn_session_t       *pn_sess;
-    pn_link_t          *pn_link;
-    qd_direction_t      direction;
-    void               *context;
-    qd_node_t          *node;
-    bool                drain_mode;
-    bool                close_sess_with_link;
+    pn_session_t               *pn_sess;
+    pn_link_t                  *pn_link;
+    qd_direction_t              direction;
+    void                       *context;
+    qd_node_t                  *node;
+    bool                        drain_mode;
+    bool                        close_sess_with_link;
+    pn_snd_settle_mode_t        remote_snd_settle_mode;
 };
 
 ALLOC_DECLARE(qd_link_t);
@@ -125,8 +125,10 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
     link->direction  = QD_OUTGOING;
     link->context    = 0;
     link->node       = node;
-    link->drain_mode = pn_link_get_drain(pn_link);
-    link->close_sess_with_link = false;
+
+    link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);
+    link->drain_mode             = pn_link_get_drain(pn_link);
+    link->close_sess_with_link   = false;
 
     //
     // Keep the borrowed references
@@ -180,6 +182,7 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link)
     link->context    = 0;
     link->node       = node;
     link->drain_mode = pn_link_get_drain(pn_link);
+    link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);
     link->close_sess_with_link = false;
 
     //
@@ -774,6 +777,7 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
     link->context    = node->context;
     link->node       = node;
     link->drain_mode = pn_link_get_drain(link->pn_link);
+    link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(link->pn_link);
     link->close_sess_with_link = true;
 
     //
@@ -863,6 +867,10 @@ qd_direction_t qd_link_direction(const qd_link_t *link)
     return link->direction;
 }
 
+pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link)
+{
+    return link->remote_snd_settle_mode;
+}
 
 qd_connection_t *qd_link_connection(qd_link_t *link)
 {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 41a8d9a..e6527b0 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -733,8 +733,9 @@ static void CORE_link_push(void *context, qdr_link_t *link)
 
 static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
 {
-    qd_link_t  *qlink = (qd_link_t*) qdr_link_get_context(link);
-    pn_link_t  *plink = qd_link_pn(qlink);
+    qd_router_t *router = (qd_router_t*) context;
+    qd_link_t  *qlink   = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t  *plink   = qd_link_pn(qlink);
     const char *tag;
     int         tag_length;
 
@@ -743,14 +744,25 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
     pn_delivery(plink, pn_dtag(tag, tag_length));
     pn_delivery_t *pdlv = pn_link_current(plink);
 
-    if (!settled) {
+    //
+    // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
+    //
+    bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
+
+    if (!settled && !remote_snd_settled) {
         pn_delivery_set_context(pdlv, dlv);
         qdr_delivery_set_context(dlv, pdlv);
     }
 
     qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
-    if (settled)
+
+    if (remote_snd_settled)
+        // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
+        qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true);
+
+    if (settled || remote_snd_settled)
         pn_delivery_settle(pdlv);
+
     pn_link_advance(plink);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 208765e..028ba1a 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -18,8 +18,10 @@
 #
 
 import unittest
-from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, Timeout
-from system_test import TestCase, Messenger, Qdrouterd, main_module
+from proton import Message, PENDING, ACCEPTED, REJECTED
+from system_test import TestCase, Qdrouterd, main_module
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
 
 # PROTON-828:
 try:
@@ -1041,6 +1043,51 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
+    def test_send_settle_mode_settled(self):
+        """
+        The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from
+        the sender. This tests make sure that the delivery that comes to the receiver comes as already settled.
+        """
+        send_settle_mode_test = SndSettleModeTest(self.address)
+        send_settle_mode_test.run()
+        self.assertTrue(send_settle_mode_test.message_received)
+        self.assertTrue(send_settle_mode_test.delivery_already_settled)
+
+
+HELLO_WORLD = "Hello World!"
+
+class SndSettleModeTest(MessagingHandler):
+    def __init__(self, address):
+        super(SndSettleModeTest, self).__init__()
+        self.address = address
+        self.sender = None
+        self.receiver = None
+        self.message_received = False
+        self.delivery_already_settled = False
+
+    def on_start(self, event):
+        conn = event.container.connect(self.address)
+        # The receiver sets link.snd_settle_mode = Link.SND_SETTLED. It wants to receive settled messages
+        self.receiver = event.container.create_receiver(conn, "org/apache/dev", options=AtMostOnce())
+
+        # With AtLeastOnce, the sender will not settle.
+        self.sender = event.container.create_sender(conn, "org/apache/dev", options=AtLeastOnce())
+
+    def on_sendable(self, event):
+        msg = Message(body=HELLO_WORLD)
+        event.sender.send(msg)
+        event.sender.close()
+
+    def on_message(self, event):
+        self.delivery_already_settled = event.delivery.settled
+        if HELLO_WORLD == event.message.body:
+            self.message_received = True
+        else:
+            self.message_received = False
+        event.connection.close()
+
+    def run(self):
+        Container(self).run()
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org