You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/06/25 19:09:06 UTC

svn commit: r958037 - in /qpid/trunk/qpid/python/qpid: messaging/driver.py messaging/endpoints.py messaging/exceptions.py tests/messaging/__init__.py tests/messaging/endpoints.py

Author: rhs
Date: Fri Jun 25 17:09:05 2010
New Revision: 958037

URL: http://svn.apache.org/viewvc?rev=958037&view=rev
Log:
added optional timeouts to {connection,session,sender,receiver}.close() as well as connection.detach() and {session,sender}.sync()

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/messaging/exceptions.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=958037&r1=958036&r2=958037&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Jun 25 17:09:05 2010
@@ -357,6 +357,8 @@ class Driver:
 
   def stop(self):
     self._selector.unregister(self)
+    if self._transport:
+      self.st_closed()
 
   def fileno(self):
     return self._transport.fileno()

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=958037&r1=958036&r2=958037&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Fri Jun 25 17:09:05 2010
@@ -251,15 +251,18 @@ class Connection:
             if not (l.linked or l.error or l.closed)]
 
   @synchronized
-  def detach(self):
+  def detach(self, timeout=None):
     """
     Detach from the remote endpoint.
     """
     self._connected = False
     self._wakeup()
-    self._wait(lambda: not self._transport_connected)
-    self._driver.stop()
-    self._condition.gc()
+    try:
+      if not self._wait(lambda: not self._transport_connected, timeout=timeout):
+        raise Timeout("detach timed out")
+    finally:
+      self._driver.stop()
+      self._condition.gc()
 
   @synchronized
   def attached(self):
@@ -269,15 +272,15 @@ class Connection:
     return self._connected
 
   @synchronized
-  def close(self):
+  def close(self, timeout=None):
     """
     Close the connection and all sessions.
     """
     try:
       for ssn in self.sessions.values():
-        ssn.close()
+        ssn.close(timeout=timeout)
     finally:
-      self.detach()
+      self.detach(timeout=timeout)
       self._open = False
 
 class Session:
@@ -677,28 +680,32 @@ class Session:
     assert self.aborted
 
   @synchronized
-  def sync(self):
+  def sync(self, timeout=None):
     """
     Sync the session.
     """
     for snd in self.senders:
-      snd.sync()
-    self._ewait(lambda: not self.outgoing and not self.acked)
+      snd.sync(timeout=timeout)
+    if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
+      raise Timeout("session sync timed out")
 
   @synchronized
-  def close(self):
+  def close(self, timeout=None):
     """
     Close the session.
     """
-    self.sync()
+    self.sync(timeout=timeout)
 
     for link in self.receivers + self.senders:
-      link.close()
+      link.close(timeout=timeout)
 
     self.closing = True
     self._wakeup()
-    self._ewait(lambda: self.closed)
-    self.connection._remove_session(self)
+    try:
+      if not self._ewait(lambda: self.closed, timeout=timeout):
+        raise Timeout("session close timed out")
+    finally:
+      self.connection._remove_session(self)
 
 class Sender:
 
@@ -816,22 +823,29 @@ class Sender:
       self._wakeup()
 
   @synchronized
-  def sync(self):
+  def sync(self, timeout=None):
     mno = self.queued
     if self.synced < mno:
       self.synced = mno
       self._wakeup()
-    self._ewait(lambda: self.acked >= mno)
+    if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+      raise Timeout("sender sync timed out")
 
   @synchronized
-  def close(self):
+  def close(self, timeout=None):
     """
     Close the Sender.
     """
+    # avoid erroring out when closing a sender that was never
+    # established
+    if self.acked < self.queued:
+      self.sync(timeout=timeout)
+
     self.closing = True
     self._wakeup()
     try:
-      self.session._ewait(lambda: self.closed)
+      if not self.session._ewait(lambda: self.closed, timeout=timeout):
+        raise Timeout("sender close timed out")
     finally:
       self.session.senders.remove(self)
 
@@ -962,14 +976,15 @@ class Receiver(object):
       self.granted = self.received + self._capacity
 
   @synchronized
-  def close(self):
+  def close(self, timeout=None):
     """
     Close the receiver.
     """
     self.closing = True
     self._wakeup()
     try:
-      self.session._ewait(lambda: self.closed)
+      if not self.session._ewait(lambda: self.closed, timeout=timeout):
+        raise Timeout("receiver close timed out")
     finally:
       self.session.receivers.remove(self)
 

Modified: qpid/trunk/qpid/python/qpid/messaging/exceptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/exceptions.py?rev=958037&r1=958036&r2=958037&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/exceptions.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/exceptions.py Fri Jun 25 17:09:05 2010
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+class Timeout(Exception):
+  pass
+
+## Messaging Errors
+
 class MessagingError(Exception):
 
   def __init__(self, code=None, text=None, **info):

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=958037&r1=958036&r2=958037&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Fri Jun 25 17:09:05 2010
@@ -51,7 +51,11 @@ class Base(Test):
 
   def teardown(self):
     if self.conn is not None and self.conn.attached():
-      self.conn.close()
+      self.teardown_connection(self.conn)
+      self.conn = None
+
+  def teardown_connection(self, conn):
+    conn.close()
 
   def content(self, base, count = None):
     if count is None:

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=958037&r1=958036&r2=958037&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Fri Jun 25 17:09:05 2010
@@ -20,10 +20,11 @@
 # setup, usage, teardown, errors(sync), errors(async), stress, soak,
 # boundary-conditions, config
 
-import errno, os, time
+import errno, os, socket, time
 from qpid import compat
 from qpid.compat import set
 from qpid.messaging import *
+from qpid.messaging.transports import TRANSPORTS
 from qpid.tests.messaging import Base
 
 class SetupTests(Base):
@@ -98,8 +99,6 @@ class SetupTests(Base):
 
   def testReconnect(self):
     options = self.connection_options()
-    import socket
-    from qpid.messaging.transports import TRANSPORTS
     real = TRANSPORTS["tcp"]
 
     class flaky:
@@ -213,6 +212,104 @@ class ConnectionTests(Base):
     self.conn.close()
     assert not self.conn.attached()
 
+class hangable:
+
+  def __init__(self, host, port):
+    self.tcp = TRANSPORTS["tcp"](host, port)
+    self.hung = False
+
+  def hang(self):
+    self.hung = True
+
+  def fileno(self):
+    return self.tcp.fileno()
+
+  def reading(self, reading):
+    if self.hung:
+      return True
+    else:
+      return self.tcp.reading(reading)
+
+  def writing(self, writing):
+    if self.hung:
+      return False
+    else:
+      return self.tcp.writing(writing)
+
+  def send(self, bytes):
+    if self.hung:
+      return 0
+    else:
+      return self.tcp.send(bytes)
+
+  def recv(self, n):
+    if self.hung:
+      return ""
+    else:
+      return self.tcp.recv(n)
+
+  def close(self):
+    self.tcp.close()
+
+TRANSPORTS["hangable"] = hangable
+
+class TimeoutTests(Base):
+
+  def setup_connection(self):
+    options = self.connection_options()
+    options["transport"] = "hangable"
+    return Connection.establish(self.broker, **options)
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def setup_sender(self):
+    return self.ssn.sender("amq.topic")
+
+  def setup_receiver(self):
+    return self.ssn.receiver("amq.topic")
+
+  def teardown_connection(self, conn):
+    try:
+      conn.detach(timeout=0)
+    except Timeout:
+      pass
+
+  def hang(self):
+    self.conn._driver._transport.hang()
+
+  def timeoutTest(self, method):
+    self.hang()
+    try:
+      method(timeout=self.delay())
+      assert False, "did not time out"
+    except Timeout:
+      pass
+
+  def testSenderSync(self):
+    self.snd.send(self.content("testSenderSync"), sync=False)
+    self.timeoutTest(self.snd.sync)
+
+  def testSenderClose(self):
+    self.snd.send(self.content("testSenderClose"), sync=False)
+    self.timeoutTest(self.snd.close)
+
+  def testReceiverClose(self):
+    self.timeoutTest(self.rcv.close)
+
+  def testSessionSync(self):
+    self.snd.send(self.content("testSessionSync"), sync=False)
+    self.timeoutTest(self.ssn.sync)
+
+  def testSessionClose(self):
+    self.timeoutTest(self.ssn.close)
+
+  def testConnectionDetach(self):
+    self.timeoutTest(self.conn.detach)
+
+  def testConnectionClose(self):
+    self.timeoutTest(self.conn.close)
+
 ACK_QC = 'test-ack-queue; {create: always}'
 ACK_QD = 'test-ack-queue; {delete: always}'
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org