You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/05/09 11:29:03 UTC

svn commit: r654737 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/SemanticState.cpp python/tests_0-10/message.py

Author: gsim
Date: Fri May  9 02:29:03 2008
New Revision: 654737

URL: http://svn.apache.org/viewvc?rev=654737&view=rev
Log:
QPID-1042: ensure delievery record is kept where accept_mode=not-required, acquire_mode=not-acquired and flow_mode=credit 


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=654737&r1=654736&r2=654737&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri May  9 02:29:03 2008
@@ -274,7 +274,7 @@
     allocateCredit(msg.payload);
     DeliveryId deliveryTag =
         parent->deliveryAdapter.deliver(msg, token);
-    if (windowing || ackExpected) {
+    if (windowing || ackExpected || !acquire) {
         parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
     } 
     if (acquire && !ackExpected) {

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=654737&r1=654736&r2=654737&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri May  9 02:29:03 2008
@@ -689,6 +689,32 @@
         #messages should still be on the queue:
         self.assertEquals(10, session.queue_query(queue = "q").message_count)
 
+    def test_acquire_with_no_accept_and_credit_flow(self):
+        """
+        Test that messages recieved unacquired, with accept not
+        required in windowing mode can be acquired.
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
+        session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+        msg = session.incoming("a").get(timeout = 1)
+        self.assertEquals("acquire me", msg.body)
+        #message should still be on the queue:
+        self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+        transfers = RangedSet(msg.id)
+        response = session.message_acquire(transfers)
+        #check that we get notification (i.e. message_acquired)
+        self.assert_(msg.id in response.transfers)
+        #message should have been removed from the queue:
+        self.assertEquals(0, session.queue_query(queue = "q").message_count)
+
     def test_acquire(self):
         """
         Test explicit acquire function
@@ -696,7 +722,6 @@
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
 
-        #use fanout for now:
         session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
 
         session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)