You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/02/25 22:29:56 UTC
svn commit: r631002 - in /incubator/qpid/trunk/qpid/python: qpid/queue.py
tests/queue.py
Author: rhs
Date: Mon Feb 25 13:29:55 2008
New Revision: 631002
URL: http://svn.apache.org/viewvc?rev=631002&view=rev
Log:
put queue listeners in their own thread
Modified:
incubator/qpid/trunk/qpid/python/qpid/queue.py
incubator/qpid/trunk/qpid/python/tests/queue.py
Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=631002&r1=631001&r2=631002&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Mon Feb 25 13:29:55 2008
@@ -24,36 +24,51 @@
"""
from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
class Closed(Exception): pass
class Queue(BaseQueue):
END = object()
+ STOP = object()
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
- self._real_put = self.put
- self.listener = self._real_put
+ self.listener = None
+ self.thread = None
def close(self):
self.put(Queue.END)
def get(self, block = True, timeout = None):
- self.put = self._real_put
- try:
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
- finally:
- self.put = self.listener
- pass
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
def listen(self, listener):
self.listener = listener
- self.put = self.listener
+ if listener == None:
+ if self.thread != None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+ else:
+ if self.thread == None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed:
+ break
Modified: incubator/qpid/trunk/qpid/python/tests/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/queue.py?rev=631002&r1=631001&r2=631002&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/queue.py Mon Feb 25 13:29:55 2008
@@ -30,37 +30,32 @@
# all the queue functionality.
def test_listen(self):
- LISTEN = object()
- GET = object()
- EMPTY = object()
+ values = []
+ heard = threading.Event()
+ def listener(x):
+ values.append(x)
+ heard.set()
q = Queue(0)
- values = []
- q.listen(lambda x: values.append((LISTEN, x)))
+ q.listen(listener)
+ heard.clear()
q.put(1)
- assert values[-1] == (LISTEN, 1)
+ heard.wait()
+ assert values[-1] == 1
+ heard.clear()
q.put(2)
- assert values[-1] == (LISTEN, 2)
-
- class Getter(threading.Thread):
+ heard.wait()
+ assert values[-1] == 2
- def run(self):
- try:
- values.append((GET, q.get(timeout=10)))
- except Empty:
- values.append(EMPTY)
-
- g = Getter()
- g.start()
- # let the other thread reach the get
- time.sleep(2)
+ q.listen(None)
q.put(3)
- g.join()
-
- assert values[-1] == (GET, 3)
+ assert q.get(3) == 3
+ q.listen(listener)
+ heard.clear()
q.put(4)
- assert values[-1] == (LISTEN, 4)
+ heard.wait()
+ assert values[-1] == 4
def test_close(self):
q = Queue(0)