You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/04/01 17:57:25 UTC
qpid-dispatch git commit: DISPATCH-235 - Added code so the router
respects the receiver's send settle mode
Repository: qpid-dispatch
Updated Branches:
refs/heads/master f3561a66c -> 20447d578
DISPATCH-235 - Added code so the router respects the receiver's send settle mode
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/20447d57
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/20447d57
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/20447d57
Branch: refs/heads/master
Commit: 20447d5782ea12281b8e366d7d715f58fba2ab21
Parents: f3561a6
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Apr 1 11:42:30 2016 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Apr 1 11:42:30 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/container.h | 1 +
src/container.c | 28 ++++++++++++-------
src/router_node.c | 20 ++++++++++---
tests/system_tests_one_router.py | 51 ++++++++++++++++++++++++++++++++--
4 files changed, 84 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 22c6870..0dcbb95 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -171,6 +171,7 @@ void *qd_link_get_conn_context(qd_link_t *link);
void policy_notify_opened(void *container, qd_connection_t *conn, void *context);
qd_direction_t qd_link_direction(const qd_link_t *link);
+pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link);
qd_connection_t *qd_link_connection(qd_link_t *link);
pn_link_t *qd_link_pn(qd_link_t *link);
pn_session_t *qd_link_pn_session(qd_link_t *link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 4d2f513..c97b5a4 100644
--- a/src/container.c
+++ b/src/container.c
@@ -50,16 +50,16 @@ DEQ_DECLARE(qd_node_t, qd_node_list_t);
ALLOC_DECLARE(qd_node_t);
ALLOC_DEFINE(qd_node_t);
-
/** Encapsulates a proton link for sending and receiving messages */
struct qd_link_t {
- pn_session_t *pn_sess;
- pn_link_t *pn_link;
- qd_direction_t direction;
- void *context;
- qd_node_t *node;
- bool drain_mode;
- bool close_sess_with_link;
+ pn_session_t *pn_sess;
+ pn_link_t *pn_link;
+ qd_direction_t direction;
+ void *context;
+ qd_node_t *node;
+ bool drain_mode;
+ bool close_sess_with_link;
+ pn_snd_settle_mode_t remote_snd_settle_mode;
};
ALLOC_DECLARE(qd_link_t);
@@ -125,8 +125,10 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
link->direction = QD_OUTGOING;
link->context = 0;
link->node = node;
- link->drain_mode = pn_link_get_drain(pn_link);
- link->close_sess_with_link = false;
+
+ link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);
+ link->drain_mode = pn_link_get_drain(pn_link);
+ link->close_sess_with_link = false;
//
// Keep the borrowed references
@@ -180,6 +182,7 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link)
link->context = 0;
link->node = node;
link->drain_mode = pn_link_get_drain(pn_link);
+ link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(pn_link);
link->close_sess_with_link = false;
//
@@ -774,6 +777,7 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
link->context = node->context;
link->node = node;
link->drain_mode = pn_link_get_drain(link->pn_link);
+ link->remote_snd_settle_mode = pn_link_remote_snd_settle_mode(link->pn_link);
link->close_sess_with_link = true;
//
@@ -863,6 +867,10 @@ qd_direction_t qd_link_direction(const qd_link_t *link)
return link->direction;
}
+pn_snd_settle_mode_t qd_link_remote_snd_settle_mode(const qd_link_t *link)
+{
+ return link->remote_snd_settle_mode;
+}
qd_connection_t *qd_link_connection(qd_link_t *link)
{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 41a8d9a..e6527b0 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -733,8 +733,9 @@ static void CORE_link_push(void *context, qdr_link_t *link)
static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled)
{
- qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
- pn_link_t *plink = qd_link_pn(qlink);
+ qd_router_t *router = (qd_router_t*) context;
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
const char *tag;
int tag_length;
@@ -743,14 +744,25 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
pn_delivery(plink, pn_dtag(tag, tag_length));
pn_delivery_t *pdlv = pn_link_current(plink);
- if (!settled) {
+ //
+ // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
+ //
+ bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
+
+ if (!settled && !remote_snd_settled) {
pn_delivery_set_context(pdlv, dlv);
qdr_delivery_set_context(dlv, pdlv);
}
qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
- if (settled)
+
+ if (remote_snd_settled)
+ // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver
+ qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true);
+
+ if (settled || remote_snd_settled)
pn_delivery_settle(pdlv);
+
pn_link_advance(plink);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/20447d57/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 208765e..028ba1a 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -18,8 +18,10 @@
#
import unittest
-from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, Timeout
-from system_test import TestCase, Messenger, Qdrouterd, main_module
+from proton import Message, PENDING, ACCEPTED, REJECTED
+from system_test import TestCase, Qdrouterd, main_module
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
# PROTON-828:
try:
@@ -1041,6 +1043,51 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
+ def test_send_settle_mode_settled(self):
+ """
+ The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from
+ the sender. This tests make sure that the delivery that comes to the receiver comes as already settled.
+ """
+ send_settle_mode_test = SndSettleModeTest(self.address)
+ send_settle_mode_test.run()
+ self.assertTrue(send_settle_mode_test.message_received)
+ self.assertTrue(send_settle_mode_test.delivery_already_settled)
+
+
+HELLO_WORLD = "Hello World!"
+
+class SndSettleModeTest(MessagingHandler):
+ def __init__(self, address):
+ super(SndSettleModeTest, self).__init__()
+ self.address = address
+ self.sender = None
+ self.receiver = None
+ self.message_received = False
+ self.delivery_already_settled = False
+
+ def on_start(self, event):
+ conn = event.container.connect(self.address)
+ # The receiver sets link.snd_settle_mode = Link.SND_SETTLED. It wants to receive settled messages
+ self.receiver = event.container.create_receiver(conn, "org/apache/dev", options=AtMostOnce())
+
+ # With AtLeastOnce, the sender will not settle.
+ self.sender = event.container.create_sender(conn, "org/apache/dev", options=AtLeastOnce())
+
+ def on_sendable(self, event):
+ msg = Message(body=HELLO_WORLD)
+ event.sender.send(msg)
+ event.sender.close()
+
+ def on_message(self, event):
+ self.delivery_already_settled = event.delivery.settled
+ if HELLO_WORLD == event.message.body:
+ self.message_received = True
+ else:
+ self.message_received = False
+ event.connection.close()
+
+ def run(self):
+ Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org