You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/09 13:25:51 UTC

qpid-dispatch git commit: DISPATCH-366 - Use modified+delivery-failed for released messages whose deliveries are in doubt.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 778b0ba23 -> 7279a153b


DISPATCH-366 - Use modified+delivery-failed for released messages whose deliveries are in doubt.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7279a153
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7279a153
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7279a153

Branch: refs/heads/master
Commit: 7279a153b7f04196bcba2876d2e1d7ff2a003919
Parents: 778b0ba
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jun 9 09:22:11 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jun 9 09:22:11 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |  2 +-
 src/router_core/router_core_private.h |  1 +
 src/router_core/transfer.c            | 19 +++++++++
 src/router_node.c                     |  5 ++-
 tests/system_tests_one_router.py      | 65 ++++++++++++++++++++++++++----
 5 files changed, 83 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index b352d92..701a91b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -512,7 +512,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         if (peer) {
             peer->peer = 0;
             if (link->link_direction == QD_OUTGOING)
-                qdr_delivery_release_CT(core, peer);
+                qdr_delivery_failed_CT(core, peer);
             qdr_delivery_decref(peer);
         }
         qdr_delivery_decref(dlv);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 1cd9438..394bd55 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -596,6 +596,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b587baf..40ba82e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -311,6 +311,25 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 }
 
 
+void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+    bool push = dlv->disposition != PN_MODIFIED;
+
+    dlv->disposition = PN_MODIFIED;
+    dlv->settled = true;
+    bool moved = qdr_delivery_settled_CT(core, dlv);
+
+    if (push || moved)
+        qdr_delivery_push_CT(core, dlv);
+
+    //
+    // Remove the unsettled reference
+    //
+    if (moved)
+        qdr_delivery_decref(dlv);
+}
+
+
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index b8e8f0e..819a69f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -859,8 +859,11 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
     //
     // If the disposition has changed, update the proton delivery.
     //
-    if (disp != pn_delivery_remote_state(pnd))
+    if (disp != pn_delivery_remote_state(pnd)) {
+        if (disp == PN_MODIFIED)
+            pn_disposition_set_failed(pn_delivery_local(pnd), true);
         pn_delivery_update(pnd, disp);
+    }
 
     //
     // If the delivery is settled, remove the linkage and settle the proton delivery.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7279a153/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index eb3b6b1..130c05a 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -18,17 +18,11 @@
 #
 
 import unittest
-from proton import Message, PENDING, ACCEPTED, REJECTED
+from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED
 from system_test import TestCase, Qdrouterd, main_module
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, AtMostOnce, AtLeastOnce
 
-# PROTON-828:
-try:
-    from proton import MODIFIED
-except ImportError:
-    from proton import PN_STATUS_MODIFIED as MODIFIED
-
 
 class RouterTest(TestCase):
     """System tests involving a single router"""
@@ -1070,6 +1064,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_18_released_vs_modified(self):
+        test = ReleasedVsModifiedTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -1257,5 +1256,57 @@ class MultiframePresettledTest(MessagingHandler):
         Container(self).run()
 
 
+class ReleasedVsModifiedTest(MessagingHandler):
+    def __init__(self, address):
+        super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False)
+        self.address = address
+        self.dest = "closest.RVMtest"
+        self.error = None
+        self.count      = 10
+        self.accept     = 6
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_released = 0
+        self.n_modified = 0
+
+    def check_if_done(self):
+        if self.n_received == self.accept and self.n_released == self.count - self.accept and self.n_modified == self.accept:
+            self.timer.cancel()
+            self.conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d, released=%d, modified=%d" % \
+                     (self.n_sent, self.n_received, self.n_released, self.n_modified)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer     = event.reactor.schedule(5, Timeout(self))
+        self.conn      = event.container.connect(self.address)
+        self.sender    = event.container.create_sender(self.conn, self.dest)
+        self.receiver  = event.container.create_receiver(self.conn, self.dest, name="A")
+        self.receiver.flow(self.accept)
+
+    def on_sendable(self, event):
+        for i in range(self.count - self.n_sent):
+            msg = Message(body="RvM-Test")
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.accept:
+            self.receiver.close()
+
+    def on_released(self, event):
+        if event.delivery.remote_state == Delivery.MODIFIED:
+            self.n_modified += 1
+        else:
+            self.n_released += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org