You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/02/25 22:29:56 UTC

svn commit: r631002 - in /incubator/qpid/trunk/qpid/python: qpid/queue.py tests/queue.py

Author: rhs
Date: Mon Feb 25 13:29:55 2008
New Revision: 631002

URL: http://svn.apache.org/viewvc?rev=631002&view=rev
Log:
put queue listeners in their own thread

Modified:
    incubator/qpid/trunk/qpid/python/qpid/queue.py
    incubator/qpid/trunk/qpid/python/tests/queue.py

Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=631002&r1=631001&r2=631002&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Mon Feb 25 13:29:55 2008
@@ -24,36 +24,51 @@
 """
 
 from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
 
 class Closed(Exception): pass
 
 class Queue(BaseQueue):
 
   END = object()
+  STOP = object()
 
   def __init__(self, *args, **kwargs):
     BaseQueue.__init__(self, *args, **kwargs)
-    self._real_put = self.put
-    self.listener = self._real_put
+    self.listener = None
+    self.thread = None
 
   def close(self):
     self.put(Queue.END)
 
   def get(self, block = True, timeout = None):
-    self.put = self._real_put
-    try:
-      result = BaseQueue.get(self, block, timeout)
-      if result == Queue.END:
-        # this guarantees that any other waiting threads or any future
-        # calls to get will also result in a Closed exception
-        self.put(Queue.END)
-        raise Closed()
-      else:
-        return result
-    finally:
-      self.put = self.listener
-      pass
+    result = BaseQueue.get(self, block, timeout)
+    if result == Queue.END:
+      # this guarantees that any other waiting threads or any future
+      # calls to get will also result in a Closed exception
+      self.put(Queue.END)
+      raise Closed()
+    else:
+      return result
 
   def listen(self, listener):
     self.listener = listener
-    self.put = self.listener
+    if listener == None:
+      if self.thread != None:
+        self.put(Queue.STOP)
+        self.thread.join()
+        self.thread = None
+    else:
+      if self.thread == None:
+        self.thread = Thread(target = self.run)
+        self.thread.setDaemon(True)
+        self.thread.start()
+
+  def run(self):
+    while True:
+      try:
+        o = self.get()
+        if o == Queue.STOP: break
+        self.listener(o)
+      except Closed:
+        break

Modified: incubator/qpid/trunk/qpid/python/tests/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/queue.py?rev=631002&r1=631001&r2=631002&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/queue.py Mon Feb 25 13:29:55 2008
@@ -30,37 +30,32 @@
   # all the queue functionality.
 
   def test_listen(self):
-    LISTEN = object()
-    GET = object()
-    EMPTY = object()
+    values = []
+    heard = threading.Event()
+    def listener(x):
+      values.append(x)
+      heard.set()
 
     q = Queue(0)
-    values = []
-    q.listen(lambda x: values.append((LISTEN, x)))
+    q.listen(listener)
+    heard.clear()
     q.put(1)
-    assert values[-1] == (LISTEN, 1)
+    heard.wait()
+    assert values[-1] == 1
+    heard.clear()
     q.put(2)
-    assert values[-1] == (LISTEN, 2)
-
-    class Getter(threading.Thread):
+    heard.wait()
+    assert values[-1] == 2
 
-      def run(self):
-        try:
-          values.append((GET, q.get(timeout=10)))
-        except Empty:
-          values.append(EMPTY)
-
-    g = Getter()
-    g.start()
-    # let the other thread reach the get
-    time.sleep(2)
+    q.listen(None)
     q.put(3)
-    g.join()
-
-    assert values[-1] == (GET, 3)
+    assert q.get(3) == 3
+    q.listen(listener)
 
+    heard.clear()
     q.put(4)
-    assert values[-1] == (LISTEN, 4)
+    heard.wait()
+    assert values[-1] == 4
 
   def test_close(self):
     q = Queue(0)