You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/05/09 21:26:29 UTC
svn commit: r654918 - in /incubator/qpid/trunk/qpid/python: qpid/session.py
qpid/spec010.py tests_0-10/message.py
Author: rhs
Date: Fri May 9 12:26:28 2008
New Revision: 654918
URL: http://svn.apache.org/viewvc?rev=654918&view=rev
Log:
QPID-1045 and QPID-1041: added a destination attribute to incoming queues, and added a start() method to incoming queues as syntactic sugar for the verbose message flow idiom
Modified:
incubator/qpid/trunk/qpid/python/qpid/session.py
incubator/qpid/trunk/qpid/python/qpid/spec010.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=654918&r1=654917&r2=654918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Fri May 9 12:26:28 2008
@@ -75,7 +75,7 @@
try:
queue = self._incoming.get(destination)
if queue == None:
- queue = Queue()
+ queue = Incoming(self, destination)
self._incoming[destination] = queue
return queue
finally:
@@ -319,6 +319,17 @@
for range in commands.ranges:
self._completed.add(range.lower, range.upper)
+class Incoming(Queue):
+
+ def __init__(self, session, destination):
+ Queue.__init__(self)
+ self.session = session
+ self.destination = destination
+
+ def start(self):
+ for unit in self.session.credit_unit.values():
+ self.session.message_flow(self.destination, unit, 0xFFFFFFFF)
+
class Delegate:
def __init__(self, session):
Modified: incubator/qpid/trunk/qpid/python/qpid/spec010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec010.py?rev=654918&r1=654917&r2=654918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec010.py Fri May 9 12:26:28 2008
@@ -170,10 +170,14 @@
def __init__(self, name):
self.name = name
+ self._names = ()
+ self._values = ()
+
+ def values(self):
+ return self._values
def __repr__(self):
- return "%s(%s)" % (self.name, ", ".join([k for k in self.__dict__.keys()
- if k != "name"]))
+ return "%s(%s)" % (self.name, ", ".join(self._names))
class Choice(Named, Node):
@@ -192,6 +196,8 @@
enum = Enum(node.name)
node.spec.enums[node.name] = enum
setattr(enum, self.name, self.value)
+ enum._names += (self.name,)
+ enum._values += (self.value,)
class Composite(Type, Coded):
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=654918&r1=654917&r2=654918&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri May 9 12:26:28 2008
@@ -229,6 +229,7 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))
session.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ myqueue = session.incoming("my-consumer")
session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)
@@ -237,7 +238,6 @@
#cancel should stop messages being delivered
session.message_cancel(destination="my-consumer")
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
- myqueue = session.incoming("my-consumer")
msg = myqueue.get(timeout=1)
self.assertEqual("One", msg.body)
try:
@@ -1001,6 +1001,22 @@
self.assertEquals("", msg.body)
session.message_accept(RangedSet(msg.id))
+ def test_incoming_start(self):
+ q = "test_incoming_start"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+ session.message_subscribe(queue=q, destination="msgs")
+ messages = session.incoming("msgs")
+ assert messages.destination == "msgs"
+
+ dp = session.delivery_properties(routing_key=q)
+ session.message_transfer(message=Message(dp, "test"))
+
+ messages.start()
+ msg = messages.get()
+ assert msg.body == "test"
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)