You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2016/02/16 20:54:44 UTC

svn commit: r1730741 - in /qpid/trunk/qpid/python: qpid/messaging/driver.py qpid/messaging/endpoints.py qpid/tests/messaging/endpoints.py setup.py

Author: kgiusti
Date: Tue Feb 16 19:54:43 2016
New Revision: 1730741

URL: http://svn.apache.org/viewvc?rev=1730741&view=rev
Log:
QPID-7053: Add a callback that is invoked when asynchronous errors are
detected by the background thread.
QPID-7064: Improve documentation of the asynchronous error callbacks.

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
    qpid/trunk/qpid/python/setup.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1730741&r1=1730740&r2=1730741&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Tue Feb 16 19:54:43 2016
@@ -1404,7 +1404,7 @@ class Engine:
       assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1
     log.debug("RCVD[%s]: %s", ssn.log_id, msg)
-    ssn.message_received(msg)
+    ssn._notify_message_received(msg)
 
 
   def _decode(self, xfr):

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=1730741&r1=1730740&r2=1730741&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Tue Feb 16 19:54:43 2016
@@ -18,7 +18,7 @@
 #
 
 """
-A candidate high level messaging API for python.
+A high level messaging API for python.
 
 Areas that still need work:
 
@@ -44,13 +44,57 @@ log = getLogger("qpid.messaging")
 
 static = staticmethod
 
-class Endpoint:
+class Endpoint(object):
+  """
+  Base class for all endpoint objects types.
+  @undocumented: __init__, __setattr__
+  """
+  def __init__(self):
+    self._async_exception_notify_handler = None
+    self.error = None
 
   def _ecwait(self, predicate, timeout=None):
     result = self._ewait(lambda: self.closed or predicate(), timeout)
     self.check_closed()
     return result
 
+  @synchronized
+  def set_async_exception_notify_handler(self, handler):
+    """
+    Register a callable that will be invoked when the driver thread detects an
+    error on the Endpoint. The callable is invoked with the instance of the
+    Endpoint object passed as the first argument. The second argument is an
+    Exception instance describing the failure.
+
+    @param handler: invoked by the driver thread when an error occurs.
+    @type handler: callable object taking an Endpoint and an Exception as
+    arguments.
+    @return: None
+    @note: The exception will also be raised the next time the application
+    invokes one of the blocking messaging APIs.
+    @warning: B{Use with caution} This callback is invoked in the context of
+    the driver thread. It is B{NOT} safe to call B{ANY} of the messaging APIs
+    from within this callback. This includes any of the Endpoint's methods. The
+    intent of the handler is to provide an efficient way to notify the
+    application that an exception has occurred in the driver thread. This can
+    be useful for those applications that periodically poll the messaging layer
+    for events. In this case the callback can be used to schedule a task that
+    retrieves the error using the Endpoint's get_error() or check_error()
+    methods.
+    """
+    self._async_exception_notify_handler = handler
+
+  def __setattr__(self, name, value):
+    """
+    Intercept any attempt to set the endpoint error flag and invoke the
+    callback if registered.
+    """
+    super(Endpoint, self).__setattr__(name, value)
+    if name == 'error' and value is not None:
+        if self._async_exception_notify_handler:
+            self._async_exception_notify_handler(self, value)
+
+
 class Connection(Endpoint):
 
   """
@@ -129,6 +173,7 @@ class Connection(Endpoint):
     @rtype: Connection
     @return: a disconnected Connection
     """
+    super(Connection, self).__init__()
     # List of all attributes
     opt_keys = ['host', 'transport', 'port', 'heartbeat', 'username', 'password', 'sasl_mechanisms', 'sasl_service', 'sasl_min_ssf', 'sasl_max_ssf', 'reconnect', 'reconnect_timeout', 'reconnect_interval', 'reconnect_interval_min', 'reconnect_interval_max', 'reconnect_limit', 'reconnect_urls', 'reconnect_log', 'address_ttl', 'tcp_nodelay', 'ssl_keyfile', 'ssl_certfile', 'ssl_trustfile', 'ssl_skip_hostname_check', 'client_properties', 'protocol' ]
     # Create all attributes on self and set to None.
@@ -200,7 +245,6 @@ class Connection(Endpoint):
     self._condition = Condition(self._lock)
     self._waiter = Waiter(self._condition)
     self._modcount = Serial(0)
-    self.error = None
     from driver import Driver
     self._driver = Driver(self)
 
@@ -343,6 +387,7 @@ class Connection(Endpoint):
       self.detach(timeout=timeout)
       self._open = False
 
+
 class Session(Endpoint):
 
   """
@@ -538,6 +583,7 @@ class Session(Endpoint):
  """
 
   def __init__(self, connection, name, transactional):
+    super(Session, self).__init__()
     self.connection = connection
     self.name = name
     self.log_id = "%x" % id(self)
@@ -560,12 +606,11 @@ class Session(Endpoint):
     # XXX: I hate this name.
     self.ack_capacity = UNLIMITED
 
-    self.error = None
     self.closing = False
     self.closed = False
 
     self._lock = connection._lock
-    self._msg_received = None
+    self._msg_received_notify_handler = None
 
   def __repr__(self):
     return "<Session %s>" % self.name
@@ -597,10 +642,15 @@ class Session(Endpoint):
     if self.closed:
       raise SessionClosed()
 
-  def message_received(self, msg):
+  def _notify_message_received(self, msg):
       self.incoming.append(msg)
-      if self._msg_received:
-          self._msg_received()
+      if self._msg_received_notify_handler:
+          try:
+              # new callback parameter: the Session
+              self._msg_received_notify_handler(self)
+          except TypeError:
+              # backward compatibility with old API, no Session
+              self._msg_received_notify_handler()
 
   @synchronized
   def sender(self, target, **options):
@@ -687,16 +737,40 @@ class Session(Endpoint):
     return None
 
   @synchronized
+  def set_message_received_notify_handler(self, handler):
+      """
+      Register a callable that will be invoked when a Message arrives on the
+      Session.
+
+      @param handler: invoked by the driver thread when an error occurs.
+      @type handler: a callable object taking a Session instance as its only
+      argument
+      @return: None
+
+      @note: When using this method it is recommended to also register
+      asynchronous error callbacks on all endpoint objects. Doing so will cause
+      the application to be notified if an error is raised by the driver
+      thread. This is necessary as after a driver error occurs the message received
+      callback may never be invoked again. See
+      L{Endpoint.set_async_exception_notify_handler}
+
+      @warning: B{Use with caution} This callback is invoked in the context of
+      the driver thread. It is B{NOT} safe to call B{ANY} of the public
+      messaging APIs from within this callback, including any of the passed
+      Session's methods. The intent of the handler is to provide an efficient
+      way to notify the application that a message has arrived.  This can be
+      useful for those applications that need to schedule a task to poll for
+      received messages without blocking in the messaging API.  The scheduled
+      task may then retrieve the message using L{next_receiver} and
+      L{Receiver.fetch}
+      """
+      self._msg_received_notify_handler = handler
+
+  @synchronized
   def set_message_received_handler(self, handler):
-      """Register a callback that will be invoked when a message arrives on the
-      session.  Use with caution: since this callback is invoked in the context
-      of the driver thread, it is not safe to call any of the public messaging
-      APIs from within this callback.  The intent of the handler is to provide
-      an efficient way to notify the application that a message has arrived.
-      This can be useful for those applications that need to schedule a task
-      to poll for received messages without blocking in the messaging API.
+      """@deprecated: Use L{set_message_received_notify_handler} instead.
       """
-      self._msg_received = handler
+      self._msg_received_notify_handler = handler
 
   @synchronized
   def next_receiver(self, timeout=None):
@@ -803,6 +877,7 @@ class Session(Endpoint):
     finally:
       self.connection._remove_session(self)
 
+
 class MangledString(str): pass
 
 def _mangle(addr):
@@ -818,6 +893,7 @@ class Sender(Endpoint):
   """
 
   def __init__(self, session, id, target, options):
+    super(Sender, self).__init__()
     self.session = session
     self.id = id
     self.target = target
@@ -828,7 +904,6 @@ class Sender(Endpoint):
     self.queued = Serial(0)
     self.synced = Serial(0)
     self.acked = Serial(0)
-    self.error = None
     self.linked = False
     self.closing = False
     self.closed = False
@@ -968,7 +1043,8 @@ class Sender(Endpoint):
       except ValueError:
         pass
 
-class Receiver(Endpoint, object):
+
+class Receiver(Endpoint):
 
   """
   Receives incoming messages from a remote source. Messages may be
@@ -976,6 +1052,7 @@ class Receiver(Endpoint, object):
   """
 
   def __init__(self, session, id, source, options):
+    super(Receiver, self).__init__()
     self.session = session
     self.id = id
     self.source = source
@@ -987,7 +1064,6 @@ class Receiver(Endpoint, object):
     self.received = Serial(0)
     self.returned = Serial(0)
 
-    self.error = None
     self.linked = False
     self.closing = False
     self.closed = False
@@ -1115,4 +1191,5 @@ class Receiver(Endpoint, object):
       except ValueError:
         pass
 
-__all__ = ["Connection", "Session", "Sender", "Receiver"]
+
+__all__ = ["Connection", "Endpoint", "Session", "Sender", "Receiver"]

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=1730741&r1=1730740&r2=1730741&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Tue Feb 16 19:54:43 2016
@@ -667,10 +667,11 @@ class SessionTests(Base):
     class CallbackHandler:
         def __init__(self):
             self.handler_called = False
-        def __call__(self):
+        def __call__(self, ssn):
             self.handler_called = True
+            self.ssn = ssn
     cb = CallbackHandler()
-    self.ssn.set_message_received_handler(cb)
+    self.ssn.set_message_received_notify_handler(cb)
     rcv = self.ssn.receiver(ADDR)
     rcv.capacity = UNLIMITED
     snd = self.ssn.sender(ADDR)
@@ -681,6 +682,7 @@ class SessionTests(Base):
         if cb.handler_called:
             break;
     assert cb.handler_called
+    assert cb.ssn == self.ssn
     snd.close()
     rcv.close()
 
@@ -1385,3 +1387,90 @@ class SenderTests(Base):
     except:
       pass
     self.snd.send(m2, timeout=self.timeout())
+
+
+class ErrorCallbackTests(Base):
+
+  class Callback:
+    def __init__(self, name):
+      self.name = name
+      self.obj = None
+      self.exc = None
+
+    def __call__(self, obj, exc):
+      self.obj = obj
+      self.exc = exc
+
+  def testConnectErrorCallback(self):
+    cb = ErrorCallbackTests.Callback("connection")
+    self.conn = Connection("localhost:4")
+    self.conn.set_async_exception_notify_handler(cb)
+    try:
+      self.conn.open()
+      assert False, "connect succeeded"
+    except Exception:
+      assert self.conn == cb.obj, cb.obj
+      assert cb.name == "connection"
+      assert cb.exc is not None
+
+  def testSessionErrorCallback(self):
+    ccb = ErrorCallbackTests.Callback("connection")
+    self.conn = Connection.establish(self.broker, **self.connection_options())
+    self.conn.set_async_exception_notify_handler(ccb)
+    scb = ErrorCallbackTests.Callback("session")
+    self.ssn = self.conn.session(transactional=True)
+    self.ssn.set_async_exception_notify_handler(scb)
+    self.conn.detach()
+    try:
+      self.ping(self.ssn)
+      assert False, "session succeeded"
+    except Exception:
+      assert self.ssn == scb.obj, scb.obj
+      assert scb.name == "session"
+      assert scb.exc is not None
+      # connection callback should be empty
+      assert ccb.obj == None, ccb.obj
+
+  def testSenderErrorCallback(self):
+    ccb = ErrorCallbackTests.Callback("connection")
+    conn = Connection(self.broker, **self.connection_options())
+    conn.set_async_exception_notify_handler(ccb)
+    scb = ErrorCallbackTests.Callback("session")
+    ssn = conn.session()
+    ssn.set_async_exception_notify_handler(scb)
+    snd = ssn.sender(NOSUCH_Q)
+    sndcb = ErrorCallbackTests.Callback("sender")
+    snd.set_async_exception_notify_handler(sndcb)
+    conn.open()
+    try:
+        snd.send(self.message("HI"))
+        assert False, "send worked"
+    except Exception:
+        assert snd == sndcb.obj, sndcb.obj
+        assert sndcb.name == "sender"
+        assert sndcb.exc is not None
+        # connection and session callbacks are empty
+        assert ccb.obj == None, ccb.obj
+        assert scb.obj == None, scb.obj
+
+  def testReceiverErrorCallback(self):
+    ccb = ErrorCallbackTests.Callback("connection")
+    self.conn = Connection(self.broker, **self.connection_options())
+    self.conn.set_async_exception_notify_handler(ccb)
+    scb = ErrorCallbackTests.Callback("session")
+    self.ssn = self.conn.session()
+    self.ssn.set_async_exception_notify_handler(scb)
+    self.recv = self.ssn.receiver(NOSUCH_Q)
+    rcb = ErrorCallbackTests.Callback("receiver")
+    self.recv.set_async_exception_notify_handler(rcb)
+    self.conn.open()
+    try:
+        self.recv.fetch()
+        assert False, "fetch worked"
+    except Exception:
+        assert self.recv == rcb.obj, rcb.obj
+        assert rcb.name == "receiver"
+        assert rcb.exc is not None
+        # connection and session callbacks are empty
+        assert ccb.obj == None, ccb.obj
+        assert scb.obj == None, scb.obj

Modified: qpid/trunk/qpid/python/setup.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/setup.py?rev=1730741&r1=1730740&r2=1730741&view=diff
==============================================================================
--- qpid/trunk/qpid/python/setup.py (original)
+++ qpid/trunk/qpid/python/setup.py Tue Feb 16 19:54:43 2016
@@ -135,7 +135,7 @@ class build_doc(Command):
 
     names = ["qpid.messaging"]
     doc_index = build_doc_index(names, True, True)
-    html_writer = HTMLWriter(doc_index)
+    html_writer = HTMLWriter(doc_index, show_private=False)
     self.mkpath(self.build_doc)
     log.info('epydoc %s to %s' % (", ".join(names), self.build_doc))
     html_writer.write(self.build_doc)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org