You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/05/09 11:29:03 UTC
svn commit: r654737 - in /incubator/qpid/trunk/qpid:
cpp/src/qpid/broker/SemanticState.cpp python/tests_0-10/message.py
Author: gsim
Date: Fri May 9 02:29:03 2008
New Revision: 654737
URL: http://svn.apache.org/viewvc?rev=654737&view=rev
Log:
QPID-1042: ensure delievery record is kept where accept_mode=not-required, acquire_mode=not-acquired and flow_mode=credit
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=654737&r1=654736&r2=654737&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri May 9 02:29:03 2008
@@ -274,7 +274,7 @@
allocateCredit(msg.payload);
DeliveryId deliveryTag =
parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
+ if (windowing || ackExpected || !acquire) {
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
}
if (acquire && !ackExpected) {
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=654737&r1=654736&r2=654737&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri May 9 02:29:03 2008
@@ -689,6 +689,32 @@
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
+ def test_acquire_with_no_accept_and_credit_flow(self):
+ """
+ Test that messages recieved unacquired, with accept not
+ required in windowing mode can be acquired.
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
+ session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
+ #message should still be on the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
+ #check that we get notification (i.e. message_acquired)
+ self.assert_(msg.id in response.transfers)
+ #message should have been removed from the queue:
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+
def test_acquire(self):
"""
Test explicit acquire function
@@ -696,7 +722,6 @@
session = self.session
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- #use fanout for now:
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)