You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/23 21:34:58 UTC

svn commit: r926766 - in /qpid/trunk/qpid/python/qpid: compat.py concurrency.py messaging/driver.py messaging/endpoints.py tests/messaging/endpoints.py

Author: rhs
Date: Tue Mar 23 20:34:58 2010
New Revision: 926766

URL: http://svn.apache.org/viewvc?rev=926766&view=rev
Log:
fixed resource leakage on repeated connection open/close

Modified:
    qpid/trunk/qpid/python/qpid/compat.py
    qpid/trunk/qpid/python/qpid/concurrency.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/compat.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/compat.py (original)
+++ qpid/trunk/qpid/python/qpid/compat.py Tue Mar 23 20:34:58 2010
@@ -84,6 +84,16 @@ if sys.platform in ('win32', 'cygwin'):
     def fileno(self):
       return self.read_sock.fileno()
 
+    def close(self):
+      if self.write_sock is not None:
+        self.write_sock.close()
+        self.write_sock = None
+        self.read_sock.close()
+        self.read_sock = None
+
+    def __del__(self):
+      self.close()
+
   def __repr__(self):
     return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
 
@@ -102,9 +112,8 @@ else:
 
   class PipeWaiter(BaseWaiter):
 
-    def __init__(self, read_fd, write_fd):
-      self.read_fd = read_fd
-      self.write_fd = write_fd
+    def __init__(self):
+      self.read_fd, self.write_fd = os.pipe()
 
     def _do_write(self):
       os.write(self.write_fd, "\0")
@@ -115,8 +124,18 @@ else:
     def fileno(self):
       return self.read_fd
 
+    def close(self):
+      if self.write_fd is not None:
+        os.close(self.write_fd)
+        self.write_fd = None
+        os.close(self.read_fd)
+        self.read_fd = None
+
+    def __del__(self):
+      self.close()
+
     def __repr__(self):
       return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
 
   def selectable_waiter():
-    return PipeWaiter(*os.pipe())
+    return PipeWaiter()

Modified: qpid/trunk/qpid/python/qpid/concurrency.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/concurrency.py (original)
+++ qpid/trunk/qpid/python/qpid/concurrency.py Tue Mar 23 20:34:58 2010
@@ -98,3 +98,9 @@ class Condition:
       self.lock._acquire_restore(st)
       self.waiting.remove(sw)
       self.waiters.append(sw)
+
+  def gc(self):
+    assert self.lock._is_owned()
+    while self.waiters:
+      sw = self.waiters.pop(0)
+      sw.close()

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Mar 23 20:34:58 2010
@@ -342,6 +342,9 @@ class Driver:
   def start(self):
     self._selector.register(self)
 
+  def stop(self):
+    self._selector.unregister(self)
+
   def fileno(self):
     return self._socket.fileno()
 

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Mar 23 20:34:58 2010
@@ -102,7 +102,6 @@ class Connection:
     self.error = None
     from driver import Driver
     self._driver = Driver(self)
-    self._driver.start()
 
   def _wait(self, predicate, timeout=None):
     return self._waiter.wait(predicate, timeout=timeout)
@@ -157,6 +156,7 @@ class Connection:
     Connect to the remote endpoint.
     """
     self._connected = True
+    self._driver.start()
     self._wakeup()
     self._ewait(lambda: self._transport_connected and not self._unlinked(),
                 exc=ConnectError)
@@ -175,6 +175,8 @@ class Connection:
     self._connected = False
     self._wakeup()
     self._ewait(lambda: not self._transport_connected)
+    self._driver.stop()
+    self._condition.gc()
 
   @synchronized
   def connected(self):

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=926766&r1=926765&r2=926766&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Tue Mar 23 20:34:58 2010
@@ -20,7 +20,7 @@
 # setup, usage, teardown, errors(sync), errors(async), stress, soak,
 # boundary-conditions, config
 
-import time
+import errno, os, time
 from qpid import compat
 from qpid.messaging import *
 from qpid.tests.messaging import Base
@@ -48,6 +48,29 @@ class SetupTests(Base):
       # XXX: should verify that e includes appropriate diagnostic info
       pass
 
+  def use_fds(self):
+    fds = []
+    try:
+      while True:
+        fds.append(os.open("/dev/null", os.O_RDONLY))
+    except OSError, e:
+      if e.errno != errno.EMFILE:
+        raise e
+      else:
+        return fds
+
+  def testOpenCloseResourceLeaks(self):
+    fds = self.use_fds()
+    try:
+      for i in range(32):
+        if fds: os.close(fds.pop())
+      for i in xrange(64):
+        conn = Connection.open(self.broker.host, self.broker.port)
+        conn.close()
+    finally:
+      while fds:
+        os.close(fds.pop())
+
 class ConnectionTests(Base):
 
   def setup_connection(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org