You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/05/16 00:15:34 UTC

svn commit: r656871 - in /incubator/qpid/trunk/qpid/python: commands/qpid-config qpid/connection.py qpid/management.py qpid/queue.py qpid/session.py

Author: rhs
Date: Thu May 15 15:15:33 2008
New Revision: 656871

URL: http://svn.apache.org/viewvc?rev=656871&view=rev
Log:
QPID-1064: made qpid-config close the session/connection; added incoming.stop() to cancel incoming messages and join on the listener thread; made managementBroker.removeChannel use incoming.stop(); modified session.close to wait on _closed rather than on channel == None

Modified:
    incubator/qpid/trunk/qpid/python/commands/qpid-config
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/queue.py
    incubator/qpid/trunk/qpid/python/qpid/session.py

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=656871&r1=656870&r2=656871&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Thu May 15 15:15:33 2008
@@ -108,6 +108,8 @@
 
     def Disconnect (self):
         self.mclient.removeChannel (self.mchannel)
+        self.session.close(timeout=10)
+        self.conn.close(timeout=10)
 
     def Overview (self):
         self.ConnectToBroker ()

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=656871&r1=656870&r2=656871&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Thu May 15 15:15:33 2008
@@ -102,7 +102,6 @@
       if ssn is not None:
         ssn.channel = None
         ssn.closed()
-        notify(ssn.condition)
         return ssn
     finally:
       self.lock.release()

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=656871&r1=656870&r2=656871&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu May 15 15:15:33 2008
@@ -129,8 +129,8 @@
 
   def shutdown (self):
     self.enabled = False
-    self.ssn.message_cancel (destination="tdest")
-    self.ssn.message_cancel (destination="rdest")
+    self.ssn.incoming("tdest").stop()
+    self.ssn.incoming("rdest").stop()
 
   def topicCb (self, msg):
     """ Receive messages via the topic queue on this channel. """

Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=656871&r1=656870&r2=656871&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Thu May 15 15:15:33 2008
@@ -42,6 +42,9 @@
   def close(self, error = None):
     self.error = error
     self.put(Queue.END)
+    if self.thread is not None:
+      self.thread.join()
+      self.thread = None
 
   def get(self, block = True, timeout = None):
     result = BaseQueue.get(self, block, timeout)

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=656871&r1=656870&r2=656871&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Thu May 15 15:15:33 2008
@@ -110,14 +110,13 @@
       self.channel.session_detach(self.name)
     finally:
       self.invoke_lock.release()
-    if not wait(self.condition, lambda: self.channel is None, timeout):
+    if not wait(self.condition, lambda: self._closed, timeout):
       raise Timeout()
 
   def closed(self):
     self.lock.acquire()
     try:
       if self._closed: return
-      self._closed = True
 
       error = self.error()
       for id in self.results:
@@ -127,6 +126,8 @@
 
       for q in self._incoming.values():
         q.close(error)
+
+      self._closed = True
       notify(self.condition)
     finally:
       self.lock.release()
@@ -344,6 +345,10 @@
     for unit in self.session.credit_unit.values():
       self.session.message_flow(self.destination, unit, 0xFFFFFFFF)
 
+  def stop(self):
+    self.session.message_cancel(self.destination)
+    self.listen(None)
+
 class Delegate:
 
   def __init__(self, session):