You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/09 13:25:51 UTC
qpid-dispatch git commit: DISPATCH-366 - Use modified+delivery-failed
for released messages whose deliveries are in doubt.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 778b0ba23 -> 7279a153b
DISPATCH-366 - Use modified+delivery-failed for released messages whose deliveries are in doubt.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7279a153
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7279a153
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7279a153
Branch: refs/heads/master
Commit: 7279a153b7f04196bcba2876d2e1d7ff2a003919
Parents: 778b0ba
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jun 9 09:22:11 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jun 9 09:22:11 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 2 +-
src/router_core/router_core_private.h | 1 +
src/router_core/transfer.c | 19 +++++++++
src/router_node.c | 5 ++-
tests/system_tests_one_router.py | 65 ++++++++++++++++++++++++++----
5 files changed, 83 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index b352d92..701a91b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -512,7 +512,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (peer) {
peer->peer = 0;
if (link->link_direction == QD_OUTGOING)
- qdr_delivery_release_CT(core, peer);
+ qdr_delivery_failed_CT(core, peer);
qdr_delivery_decref(peer);
}
qdr_delivery_decref(dlv);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 1cd9438..394bd55 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -596,6 +596,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b587baf..40ba82e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -311,6 +311,25 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
}
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ bool push = dlv->disposition != PN_MODIFIED;
+
+ dlv->disposition = PN_MODIFIED;
+ dlv->settled = true;
+ bool moved = qdr_delivery_settled_CT(core, dlv);
+
+ if (push || moved)
+ qdr_delivery_push_CT(core, dlv);
+
+ //
+ // Remove the unsettled reference
+ //
+ if (moved)
+ qdr_delivery_decref(dlv);
+}
+
+
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index b8e8f0e..819a69f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -859,8 +859,11 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
//
// If the disposition has changed, update the proton delivery.
//
- if (disp != pn_delivery_remote_state(pnd))
+ if (disp != pn_delivery_remote_state(pnd)) {
+ if (disp == PN_MODIFIED)
+ pn_disposition_set_failed(pn_delivery_local(pnd), true);
pn_delivery_update(pnd, disp);
+ }
//
// If the delivery is settled, remove the linkage and settle the proton delivery.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index eb3b6b1..130c05a 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -18,17 +18,11 @@
#
import unittest
-from proton import Message, PENDING, ACCEPTED, REJECTED
+from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED
from system_test import TestCase, Qdrouterd, main_module
from proton.handlers import MessagingHandler
from proton.reactor import Container, AtMostOnce, AtLeastOnce
-# PROTON-828:
-try:
- from proton import MODIFIED
-except ImportError:
- from proton import PN_STATUS_MODIFIED as MODIFIED
-
class RouterTest(TestCase):
"""System tests involving a single router"""
@@ -1070,6 +1064,11 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_18_released_vs_modified(self):
+ test = ReleasedVsModifiedTest(self.address)
+ test.run()
+ self.assertEqual(None, test.error)
+
class Timeout(object):
def __init__(self, parent):
@@ -1257,5 +1256,57 @@ class MultiframePresettledTest(MessagingHandler):
Container(self).run()
+class ReleasedVsModifiedTest(MessagingHandler):
+ def __init__(self, address):
+ super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False)
+ self.address = address
+ self.dest = "closest.RVMtest"
+ self.error = None
+ self.count = 10
+ self.accept = 6
+ self.n_sent = 0
+ self.n_received = 0
+ self.n_released = 0
+ self.n_modified = 0
+
+ def check_if_done(self):
+ if self.n_received == self.accept and self.n_released == self.count - self.accept and self.n_modified == self.accept:
+ self.timer.cancel()
+ self.conn.close()
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d, received=%d, released=%d, modified=%d" % \
+ (self.n_sent, self.n_received, self.n_released, self.n_modified)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+ self.sender = event.container.create_sender(self.conn, self.dest)
+ self.receiver = event.container.create_receiver(self.conn, self.dest, name="A")
+ self.receiver.flow(self.accept)
+
+ def on_sendable(self, event):
+ for i in range(self.count - self.n_sent):
+ msg = Message(body="RvM-Test")
+ event.sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ self.n_received += 1
+ if self.n_received == self.accept:
+ self.receiver.close()
+
+ def on_released(self, event):
+ if event.delivery.remote_state == Delivery.MODIFIED:
+ self.n_modified += 1
+ else:
+ self.n_released += 1
+ self.check_if_done()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org