You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/08/28 21:39:06 UTC

[6/7] qpid-proton git commit: PROTON-1922: [Python] Restrict exported symbols from proton submodules - Restricted symbols exported by proton.reactor, proton.handlers, proton.utils - All symbols used by tests and examples are exported - Other symbols that

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 8b798f8..c324812 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -19,727 +19,22 @@
 
 from __future__ import absolute_import
 
-import logging
-import time
-import weakref
-from select import select
-
-from ._delivery import Delivery
-from ._endpoints import Endpoint
-from ._message import Message
-from ._exceptions import ProtonException
-from ._events import Handler, _dispatch
-
-log = logging.getLogger("proton")
-
-
-class OutgoingMessageHandler(Handler):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to outgoing i.e. sent messages.
-    """
-
-    def __init__(self, auto_settle=True, delegate=None):
-        self.auto_settle = auto_settle
-        self.delegate = delegate
-
-    def on_link_flow(self, event):
-        if event.link.is_sender and event.link.credit \
-                and event.link.state & Endpoint.LOCAL_ACTIVE \
-                and event.link.state & Endpoint.REMOTE_ACTIVE:
-            self.on_sendable(event)
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if dlv.link.is_sender and dlv.updated:
-            if dlv.remote_state == Delivery.ACCEPTED:
-                self.on_accepted(event)
-            elif dlv.remote_state == Delivery.REJECTED:
-                self.on_rejected(event)
-            elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
-                self.on_released(event)
-            if dlv.settled:
-                self.on_settled(event)
-            if self.auto_settle:
-                dlv.settle()
-
-    def on_sendable(self, event):
-        """
-        Called when the sender link has credit and messages can
-        therefore be transferred.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_sendable', event)
-
-    def on_accepted(self, event):
-        """
-        Called when the remote peer accepts an outgoing message.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_accepted', event)
-
-    def on_rejected(self, event):
-        """
-        Called when the remote peer rejects an outgoing message.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_rejected', event)
-
-    def on_released(self, event):
-        """
-        Called when the remote peer releases an outgoing message. Note
-        that this may be in response to either the RELEASE or MODIFIED
-        state as defined by the AMQP specification.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_released', event)
-
-    def on_settled(self, event):
-        """
-        Called when the remote peer has settled the outgoing
-        message. This is the point at which it should never be
-        retransmitted.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_settled', event)
-
-
-def recv_msg(delivery):
-    msg = Message()
-    msg.decode(delivery.link.recv(delivery.pending))
-    delivery.link.advance()
-    return msg
-
-
-class Reject(ProtonException):
-    """
-    An exception that indicate a message should be rejected
-    """
-    pass
-
-
-class Release(ProtonException):
-    """
-    An exception that indicate a message should be rejected
-    """
-    pass
-
-
-class Acking(object):
-    def accept(self, delivery):
-        """
-        Accepts a received message.
-
-        Note that this method cannot currently be used in combination
-        with transactions.
-        """
-        self.settle(delivery, Delivery.ACCEPTED)
-
-    def reject(self, delivery):
-        """
-        Rejects a received message that is considered invalid or
-        unprocessable.
-        """
-        self.settle(delivery, Delivery.REJECTED)
-
-    def release(self, delivery, delivered=True):
-        """
-        Releases a received message, making it available at the source
-        for any (other) interested receiver. The ``delivered``
-        parameter indicates whether this should be considered a
-        delivery attempt (and the delivery count updated) or not.
-        """
-        if delivered:
-            self.settle(delivery, Delivery.MODIFIED)
-        else:
-            self.settle(delivery, Delivery.RELEASED)
-
-    def settle(self, delivery, state=None):
-        if state:
-            delivery.update(state)
-        delivery.settle()
-
-
-class IncomingMessageHandler(Handler, Acking):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to incoming i.e. received messages.
-    """
-
-    def __init__(self, auto_accept=True, delegate=None):
-        self.delegate = delegate
-        self.auto_accept = auto_accept
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if not dlv.link.is_receiver: return
-        if dlv.aborted:
-            self.on_aborted(event)
-            dlv.settle()
-        elif dlv.readable and not dlv.partial:
-            event.message = recv_msg(dlv)
-            if event.link.state & Endpoint.LOCAL_CLOSED:
-                if self.auto_accept:
-                    dlv.update(Delivery.RELEASED)
-                    dlv.settle()
-            else:
-                try:
-                    self.on_message(event)
-                    if self.auto_accept:
-                        dlv.update(Delivery.ACCEPTED)
-                        dlv.settle()
-                except Reject:
-                    dlv.update(Delivery.REJECTED)
-                    dlv.settle()
-                except Release:
-                    dlv.update(Delivery.MODIFIED)
-                    dlv.settle()
-        elif dlv.updated and dlv.settled:
-            self.on_settled(event)
-
-    def on_message(self, event):
-        """
-        Called when a message is received. The message itself can be
-        obtained as a property on the event. For the purpose of
-        referring to this message in further actions (e.g. if
-        explicitly accepting it, the ``delivery`` should be used, also
-        obtainable via a property on the event.
-        """
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_message', event)
-
-    def on_settled(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_settled', event)
-
-    def on_aborted(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_aborted', event)
-
-
-class EndpointStateHandler(Handler):
-    """
-    A utility that exposes 'endpoint' events i.e. the open/close for
-    links, sessions and connections in a more intuitive manner. A
-    XXX_opened method will be called when both local and remote peers
-    have opened the link, session or connection. This can be used to
-    confirm a locally initiated action for example. A XXX_opening
-    method will be called when the remote peer has requested an open
-    that was not initiated locally. By default this will simply open
-    locally, which then triggers the XXX_opened call. The same applies
-    to close.
-    """
-
-    def __init__(self, peer_close_is_error=False, delegate=None):
-        self.delegate = delegate
-        self.peer_close_is_error = peer_close_is_error
-
-    @classmethod
-    def is_local_open(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_ACTIVE
-
-    @classmethod
-    def is_local_uninitialised(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_UNINIT
-
-    @classmethod
-    def is_local_closed(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_CLOSED
-
-    @classmethod
-    def is_remote_open(cls, endpoint):
-        return endpoint.state & Endpoint.REMOTE_ACTIVE
-
-    @classmethod
-    def is_remote_closed(cls, endpoint):
-        return endpoint.state & Endpoint.REMOTE_CLOSED
-
-    @classmethod
-    def print_error(cls, endpoint, endpoint_type):
-        if endpoint.remote_condition:
-            log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
-        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
-            log.error("%s closed by peer" % endpoint_type)
-
-    def on_link_remote_close(self, event):
-        if event.link.remote_condition:
-            self.on_link_error(event)
-        elif self.is_local_closed(event.link):
-            self.on_link_closed(event)
-        else:
-            self.on_link_closing(event)
-        event.link.close()
-
-    def on_session_remote_close(self, event):
-        if event.session.remote_condition:
-            self.on_session_error(event)
-        elif self.is_local_closed(event.session):
-            self.on_session_closed(event)
-        else:
-            self.on_session_closing(event)
-        event.session.close()
-
-    def on_connection_remote_close(self, event):
-        if event.connection.remote_condition:
-            if event.connection.remote_condition.name == "amqp:connection:forced":
-                # Treat this the same as just having the transport closed by the peer without
-                # sending any events. Allow reconnection to happen transparently.
-                return
-            self.on_connection_error(event)
-        elif self.is_local_closed(event.connection):
-            self.on_connection_closed(event)
-        else:
-            self.on_connection_closing(event)
-        event.connection.close()
-
-    def on_connection_local_open(self, event):
-        if self.is_remote_open(event.connection):
-            self.on_connection_opened(event)
-
-    def on_connection_remote_open(self, event):
-        if self.is_local_open(event.connection):
-            self.on_connection_opened(event)
-        elif self.is_local_uninitialised(event.connection):
-            self.on_connection_opening(event)
-            event.connection.open()
-
-    def on_session_local_open(self, event):
-        if self.is_remote_open(event.session):
-            self.on_session_opened(event)
-
-    def on_session_remote_open(self, event):
-        if self.is_local_open(event.session):
-            self.on_session_opened(event)
-        elif self.is_local_uninitialised(event.session):
-            self.on_session_opening(event)
-            event.session.open()
-
-    def on_link_local_open(self, event):
-        if self.is_remote_open(event.link):
-            self.on_link_opened(event)
-
-    def on_link_remote_open(self, event):
-        if self.is_local_open(event.link):
-            self.on_link_opened(event)
-        elif self.is_local_uninitialised(event.link):
-            self.on_link_opening(event)
-            event.link.open()
-
-    def on_connection_opened(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_connection_opened', event)
-
-    def on_session_opened(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_session_opened', event)
-
-    def on_link_opened(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_link_opened', event)
-
-    def on_connection_opening(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_connection_opening', event)
-
-    def on_session_opening(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_session_opening', event)
-
-    def on_link_opening(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_link_opening', event)
-
-    def on_connection_error(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_connection_error', event)
-        else:
-            self.log_error(event.connection, "connection")
-
-    def on_session_error(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_session_error', event)
-        else:
-            self.log_error(event.session, "session")
-            event.connection.close()
-
-    def on_link_error(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_link_error', event)
-        else:
-            self.log_error(event.link, "link")
-            event.connection.close()
-
-    def on_connection_closed(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_connection_closed', event)
-
-    def on_session_closed(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_session_closed', event)
-
-    def on_link_closed(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_link_closed', event)
-
-    def on_connection_closing(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_connection_closing', event)
-        elif self.peer_close_is_error:
-            self.on_connection_error(event)
-
-    def on_session_closing(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_session_closing', event)
-        elif self.peer_close_is_error:
-            self.on_session_error(event)
-
-    def on_link_closing(self, event):
-        if self.delegate is not None:
-            _dispatch(self.delegate, 'on_link_closing', event)
-        elif self.peer_close_is_error:
-            self.on_link_error(event)
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        if self.delegate is not None and event.connection and self.is_local_open(event.connection):
-            _dispatch(self.delegate, 'on_disconnected', event)
-
-
-class MessagingHandler(Handler, Acking):
-    """
-    A general purpose handler that makes the proton-c events somewhat
-    simpler to deal with and/or avoids repetitive tasks for common use
-    cases.
-    """
-
-    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
-        self.handlers = []
-        if prefetch:
-            self.handlers.append(FlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
-        self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
-        self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
-        self.fatal_conditions = ["amqp:unauthorized-access"]
-
-    def on_transport_error(self, event):
-        """
-        Called when some error is encountered with the transport over
-        which the AMQP connection is to be established. This includes
-        authentication errors as well as socket errors.
-        """
-        if event.transport.condition:
-            if event.transport.condition.info:
-                log.error("%s: %s: %s" % (
-                    event.transport.condition.name, event.transport.condition.description,
-                    event.transport.condition.info))
-            else:
-                log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
-            if event.transport.condition.name in self.fatal_conditions:
-                event.connection.close()
-        else:
-            logging.error("Unspecified transport error")
-
-    def on_connection_error(self, event):
-        """
-        Called when the peer closes the connection with an error condition.
-        """
-        EndpointStateHandler.print_error(event.connection, "connection")
-
-    def on_session_error(self, event):
-        """
-        Called when the peer closes the session with an error condition.
-        """
-        EndpointStateHandler.print_error(event.session, "session")
-        event.connection.close()
-
-    def on_link_error(self, event):
-        """
-        Called when the peer closes the link with an error condition.
-        """
-        EndpointStateHandler.print_error(event.link, "link")
-        event.connection.close()
-
-    def on_reactor_init(self, event):
-        """
-        Called when the event loop - the reactor - starts.
-        """
-        if hasattr(event.reactor, 'subclass'):
-            setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
-        self.on_start(event)
-
-    def on_start(self, event):
-        """
-        Called when the event loop starts. (Just an alias for on_reactor_init)
-        """
-        pass
-
-    def on_connection_closed(self, event):
-        """
-        Called when the connection is closed.
-        """
-        pass
-
-    def on_session_closed(self, event):
-        """
-        Called when the session is closed.
-        """
-        pass
-
-    def on_link_closed(self, event):
-        """
-        Called when the link is closed.
-        """
-        pass
-
-    def on_connection_closing(self, event):
-        """
-        Called when the peer initiates the closing of the connection.
-        """
-        pass
-
-    def on_session_closing(self, event):
-        """
-        Called when the peer initiates the closing of the session.
-        """
-        pass
-
-    def on_link_closing(self, event):
-        """
-        Called when the peer initiates the closing of the link.
-        """
-        pass
-
-    def on_disconnected(self, event):
-        """
-        Called when the socket is disconnected.
-        """
-        pass
-
-    def on_sendable(self, event):
-        """
-        Called when the sender link has credit and messages can
-        therefore be transferred.
-        """
-        pass
-
-    def on_accepted(self, event):
-        """
-        Called when the remote peer accepts an outgoing message.
-        """
-        pass
-
-    def on_rejected(self, event):
-        """
-        Called when the remote peer rejects an outgoing message.
-        """
-        pass
-
-    def on_released(self, event):
-        """
-        Called when the remote peer releases an outgoing message. Note
-        that this may be in response to either the RELEASE or MODIFIED
-        state as defined by the AMQP specification.
-        """
-        pass
-
-    def on_settled(self, event):
-        """
-        Called when the remote peer has settled the outgoing
-        message. This is the point at which it should never be
-        retransmitted.
-        """
-        pass
-
-    def on_message(self, event):
-        """
-        Called when a message is received. The message itself can be
-        obtained as a property on the event. For the purpose of
-        referring to this message in further actions (e.g. if
-        explicitly accepting it, the ``delivery`` should be used, also
-        obtainable via a property on the event.
-        """
-        pass
-
-
-class TransactionHandler(object):
-    """
-    The interface for transaction handlers, i.e. objects that want to
-    be notified of state changes related to a transaction.
-    """
-
-    def on_transaction_declared(self, event):
-        pass
-
-    def on_transaction_committed(self, event):
-        pass
-
-    def on_transaction_aborted(self, event):
-        pass
-
-    def on_transaction_declare_failed(self, event):
-        pass
-
-    def on_transaction_commit_failed(self, event):
-        pass
-
-
-class TransactionalClientHandler(MessagingHandler, TransactionHandler):
-    """
-    An extension to the MessagingHandler for applications using
-    transactions.
-    """
-
-    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
-        super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
-
-    def accept(self, delivery, transaction=None):
-        if transaction:
-            transaction.accept(delivery)
-        else:
-            super(TransactionalClientHandler, self).accept(delivery)
-
-
-class FlowController(Handler):
-    def __init__(self, window=1024):
-        self._window = window
-        self._drained = 0
-
-    def on_link_local_open(self, event):
-        self._flow(event.link)
-
-    def on_link_remote_open(self, event):
-        self._flow(event.link)
-
-    def on_link_flow(self, event):
-        self._flow(event.link)
-
-    def on_delivery(self, event):
-        self._flow(event.link)
-
-    def _flow(self, link):
-        if link.is_receiver:
-            self._drained += link.drained()
-            if self._drained == 0:
-                delta = self._window - link.credit
-                link.flow(delta)
-
-
-class Handshaker(Handler):
-
-    @staticmethod
-    def on_connection_remote_open(event):
-        conn = event.connection
-        if conn.state & Endpoint.LOCAL_UNINIT:
-            conn.open()
-
-    @staticmethod
-    def on_session_remote_open(event):
-        ssn = event.session
-        if ssn.state() & Endpoint.LOCAL_UNINIT:
-            ssn.open()
-
-    @staticmethod
-    def on_link_remote_open(event):
-        link = event.link
-        if link.state & Endpoint.LOCAL_UNINIT:
-            link.source.copy(link.remote_source)
-            link.target.copy(link.remote_target)
-            link.open()
-
-    @staticmethod
-    def on_connection_remote_close(event):
-        conn = event.connection
-        if not conn.state & Endpoint.LOCAL_CLOSED:
-            conn.close()
-
-    @staticmethod
-    def on_session_remote_close(event):
-        ssn = event.session
-        if not ssn.state & Endpoint.LOCAL_CLOSED:
-            ssn.close()
-
-    @staticmethod
-    def on_link_remote_close(event):
-        link = event.link
-        if not link.state & Endpoint.LOCAL_CLOSED:
-            link.close()
-
-
-# Back compatibility definitions
-CFlowController = FlowController
-CHandshaker = Handshaker
-
-
-from ._events import WrappedHandler
-from cproton import pn_iohandler
-
-class IOHandler(WrappedHandler):
-
-    def __init__(self):
-        WrappedHandler.__init__(self, pn_iohandler)
-
-
-class PythonIO:
-
-    def __init__(self):
-        self.selectables = []
-        self.delegate = IOHandler()
-
-    def on_unhandled(self, method, event):
-        event.dispatch(self.delegate)
-
-    def on_selectable_init(self, event):
-        self.selectables.append(event.context)
-
-    def on_selectable_updated(self, event):
-        pass
-
-    def on_selectable_final(self, event):
-        sel = event.context
-        if sel.is_terminal:
-            self.selectables.remove(sel)
-            sel.release()
-
-    def on_reactor_quiesced(self, event):
-        reactor = event.reactor
-        # check if we are still quiesced, other handlers of
-        # on_reactor_quiesced could have produced events to process
-        if not reactor.quiesced: return
-
-        reading = []
-        writing = []
-        deadline = None
-        for sel in self.selectables:
-            if sel.reading:
-                reading.append(sel)
-            if sel.writing:
-                writing.append(sel)
-            if sel.deadline:
-                if deadline is None:
-                    deadline = sel.deadline
-                else:
-                    deadline = min(sel.deadline, deadline)
-
-        if deadline is not None:
-            timeout = deadline - time.time()
-        else:
-            timeout = reactor.timeout
-        if (timeout < 0): timeout = 0
-        timeout = min(timeout, reactor.timeout)
-        readable, writable, _ = select(reading, writing, [], timeout)
-
-        reactor.mark()
-
-        now = time.time()
-
-        for s in readable:
-            s.readable()
-        for s in writable:
-            s.writable()
-        for s in self.selectables:
-            if s.deadline and now > s.deadline:
-                s.expired()
-
-        reactor.yield_()
+from ._handlers import MessagingHandler, IncomingMessageHandler, OutgoingMessageHandler, \
+    EndpointStateHandler, TransactionHandler, TransactionalClientHandler,\
+    Reject, Release,\
+    Handshaker, FlowController, IOHandler, PythonIO
+
+__all__ = [
+    'MessagingHandler',
+    'IncomingMessageHandler',
+    'OutgoingMessageHandler',
+    'EndpointStateHandler',
+    'TransactionHandler',
+    'TransactionalClientHandler',
+    'Reject',
+    'Release',
+    'Handshaker',
+    'FlowController',
+    'IOHandler',
+    'PythonIO'
+]

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/reactor.py b/python/proton/reactor.py
index ab9c7db..998b86a 100644
--- a/python/proton/reactor.py
+++ b/python/proton/reactor.py
@@ -19,964 +19,22 @@
 
 from __future__ import absolute_import
 
-import os
-import logging
-import traceback
-import uuid
-
-from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
-    pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
-    pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
-    pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
-    pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
-    pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
-    pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
-    pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
-    pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
-
-from ._delivery import  Delivery
-from ._endpoints import Connection, Endpoint, Link, Session, Terminus
-from ._data import Described, symbol, ulong
-from ._message import  Message
-from ._transport import Transport, SSL, SSLDomain, SSLUnavailable
-from ._url import Url
-from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
-from ._events import EventType, EventBase, Handler
-from ._reactor_impl import Selectable, WrappedHandler, _chandler
-from ._wrapper import Wrapper, PYCTX
-
-from .handlers import OutgoingMessageHandler
-
-from . import _compat
-from ._compat import queue
-
-log = logging.getLogger("proton")
-
-
-def generate_uuid():
-    return uuid.uuid4()
-
-
-def _timeout2millis(secs):
-    if secs is None: return PN_MILLIS_MAX
-    return secs2millis(secs)
-
-
-def _millis2timeout(millis):
-    if millis == PN_MILLIS_MAX: return None
-    return millis2secs(millis)
-
-
-class Task(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            return Task(impl)
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl, pn_task_attachments)
-
-    def _init(self):
-        pass
-
-    def cancel(self):
-        pn_task_cancel(self._impl)
-
-
-class Acceptor(Wrapper):
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl)
-
-    def set_ssl_domain(self, ssl_domain):
-        pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
-
-    def close(self):
-        pn_acceptor_close(self._impl)
-
-
-class Reactor(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            record = pn_reactor_attachments(impl)
-            attrs = pn_void2py(pn_record_get(record, PYCTX))
-            if attrs and 'subclass' in attrs:
-                return attrs['subclass'](impl=impl)
-            else:
-                return Reactor(impl=impl)
-
-    def __init__(self, *handlers, **kwargs):
-        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
-        for h in handlers:
-            self.handler.add(h, on_error=self.on_error_delegate())
-
-    def _init(self):
-        self.errors = []
-
-    # on_error relay handler tied to underlying C reactor.  Use when the
-    # error will always be generated from a callback from this reactor.
-    # Needed to prevent reference cycles and be compatible with wrappers.
-    class ErrorDelegate(object):
-        def __init__(self, reactor):
-            self.reactor_impl = reactor._impl
-
-        def on_error(self, info):
-            ractor = Reactor.wrap(self.reactor_impl)
-            ractor.on_error(info)
-
-    def on_error_delegate(self):
-        return Reactor.ErrorDelegate(self).on_error
-
-    def on_error(self, info):
-        self.errors.append(info)
-        self.yield_()
-
-    def _get_global(self):
-        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
-
-    def _set_global(self, handler):
-        impl = _chandler(handler, self.on_error_delegate())
-        pn_reactor_set_global_handler(self._impl, impl)
-        pn_decref(impl)
-
-    global_handler = property(_get_global, _set_global)
-
-    def _get_timeout(self):
-        return _millis2timeout(pn_reactor_get_timeout(self._impl))
-
-    def _set_timeout(self, secs):
-        return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
-
-    timeout = property(_get_timeout, _set_timeout)
-
-    def yield_(self):
-        pn_reactor_yield(self._impl)
-
-    def mark(self):
-        return pn_reactor_mark(self._impl)
-
-    def _get_handler(self):
-        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
-
-    def _set_handler(self, handler):
-        impl = _chandler(handler, self.on_error_delegate())
-        pn_reactor_set_handler(self._impl, impl)
-        pn_decref(impl)
-
-    handler = property(_get_handler, _set_handler)
-
-    def run(self):
-        self.timeout = 3.14159265359
-        self.start()
-        while self.process(): pass
-        self.stop()
-        self.process()
-        self.global_handler = None
-        self.handler = None
-
-    def wakeup(self):
-        n = pn_reactor_wakeup(self._impl)
-        if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
-
-    def start(self):
-        pn_reactor_start(self._impl)
-
-    @property
-    def quiesced(self):
-        return pn_reactor_quiesced(self._impl)
-
-    def _check_errors(self):
-        if self.errors:
-            for exc, value, tb in self.errors[:-1]:
-                traceback.print_exception(exc, value, tb)
-            exc, value, tb = self.errors[-1]
-            _compat.raise_(exc, value, tb)
-
-    def process(self):
-        result = pn_reactor_process(self._impl)
-        self._check_errors()
-        return result
-
-    def stop(self):
-        pn_reactor_stop(self._impl)
-        self._check_errors()
-
-    def schedule(self, delay, task):
-        impl = _chandler(task, self.on_error_delegate())
-        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
-        pn_decref(impl)
-        return task
-
-    def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler, self.on_error_delegate())
-        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
-        pn_decref(impl)
-        if aimpl:
-            return Acceptor(aimpl)
-        else:
-            raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
-
-    def connection(self, handler=None):
-        """Deprecated: use connection_to_host() instead
-        """
-        impl = _chandler(handler, self.on_error_delegate())
-        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
-        if impl: pn_decref(impl)
-        return result
-
-    def connection_to_host(self, host, port, handler=None):
-        """Create an outgoing Connection that will be managed by the reactor.
-        The reactor's pn_iohandler will create a socket connection to the host
-        once the connection is opened.
-        """
-        conn = self.connection(handler)
-        self.set_connection_host(conn, host, port)
-        return conn
-
-    def set_connection_host(self, connection, host, port):
-        """Change the address used by the connection.  The address is
-        used by the reactor's iohandler to create an outgoing socket
-        connection.  This must be set prior to opening the connection.
-        """
-        pn_reactor_set_connection_host(self._impl,
-                                       connection._impl,
-                                       unicode2utf8(str(host)),
-                                       unicode2utf8(str(port)))
-
-    def get_connection_address(self, connection):
-        """This may be used to retrieve the remote peer address.
-        @return: string containing the address in URL format or None if no
-        address is available.  Use the proton.Url class to create a Url object
-        from the returned value.
-        """
-        _url = pn_reactor_get_connection_address(self._impl, connection._impl)
-        return utf82unicode(_url)
-
-    def selectable(self, handler=None):
-        impl = _chandler(handler, self.on_error_delegate())
-        result = Selectable.wrap(pn_reactor_selectable(self._impl))
-        if impl:
-            record = pn_selectable_attachments(result._impl)
-            pn_record_set_handler(record, impl)
-            pn_decref(impl)
-        return result
-
-    def update(self, sel):
-        pn_reactor_update(self._impl, sel._impl)
-
-    def push_event(self, obj, etype):
-        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-
-
-from ._events import wrappers as _wrappers
-
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
-
-
-class EventInjector(object):
-    """
-    Can be added to a reactor to allow events to be triggered by an
-    external thread but handled on the event thread associated with
-    the reactor. An instance of this class can be passed to the
-    Reactor.selectable() method of the reactor in order to activate
-    it. The close() method should be called when it is no longer
-    needed, to allow the event loop to end if needed.
-    """
-
-    def __init__(self):
-        self.queue = queue.Queue()
-        self.pipe = os.pipe()
-        self._closed = False
-
-    def trigger(self, event):
-        """
-        Request that the given event be dispatched on the event thread
-        of the reactor to which this EventInjector was added.
-        """
-        self.queue.put(event)
-        os.write(self.pipe[1], b"!")
-
-    def close(self):
-        """
-        Request that this EventInjector be closed. Existing events
-        will be dispatched on the reactors event dispatch thread,
-        then this will be removed from the set of interest.
-        """
-        self._closed = True
-        os.write(self.pipe[1], b"!")
-
-    def fileno(self):
-        return self.pipe[0]
-
-    def on_selectable_init(self, event):
-        sel = event.context
-        sel.fileno(self.fileno())
-        sel.reading = True
-        event.reactor.update(sel)
-
-    def on_selectable_readable(self, event):
-        os.read(self.pipe[0], 512)
-        while not self.queue.empty():
-            requested = self.queue.get()
-            event.reactor.push_event(requested.context, requested.type)
-        if self._closed:
-            s = event.context
-            s.terminate()
-            event.reactor.update(s)
-
-
-class ApplicationEvent(EventBase):
-    """
-    Application defined event, which can optionally be associated with
-    an engine object and or an arbitrary subject
-    """
-
-    def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
-        super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
-        self.connection = connection
-        self.session = session
-        self.link = link
-        self.delivery = delivery
-        if self.delivery:
-            self.link = self.delivery.link
-        if self.link:
-            self.session = self.link.session
-        if self.session:
-            self.connection = self.session.connection
-        self.subject = subject
-
-    def __repr__(self):
-        objects = [self.connection, self.session, self.link, self.delivery, self.subject]
-        return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
-
-
-class Transaction(object):
-    """
-    Class to track state of an AMQP 1.0 transaction.
-    """
-
-    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
-        self.txn_ctrl = txn_ctrl
-        self.handler = handler
-        self.id = None
-        self._declare = None
-        self._discharge = None
-        self.failed = False
-        self._pending = []
-        self.settle_before_discharge = settle_before_discharge
-        self.declare()
-
-    def commit(self):
-        self.discharge(False)
-
-    def abort(self):
-        self.discharge(True)
-
-    def declare(self):
-        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
-
-    def discharge(self, failed):
-        self.failed = failed
-        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
-
-    def _send_ctrl(self, descriptor, value):
-        delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
-        delivery.transaction = self
-        return delivery
-
-    def send(self, sender, msg, tag=None):
-        dlv = sender.send(msg, tag=tag)
-        dlv.local.data = [self.id]
-        dlv.update(0x34)
-        return dlv
-
-    def accept(self, delivery):
-        self.update(delivery, PN_ACCEPTED)
-        if self.settle_before_discharge:
-            delivery.settle()
-        else:
-            self._pending.append(delivery)
-
-    def update(self, delivery, state=None):
-        if state:
-            delivery.local.data = [self.id, Described(ulong(state), [])]
-            delivery.update(0x34)
-
-    def _release_pending(self):
-        for d in self._pending:
-            d.update(Delivery.RELEASED)
-            d.settle()
-        self._clear_pending()
-
-    def _clear_pending(self):
-        self._pending = []
-
-    def handle_outcome(self, event):
-        if event.delivery == self._declare:
-            if event.delivery.remote.data:
-                self.id = event.delivery.remote.data[0]
-                self.handler.on_transaction_declared(event)
-            elif event.delivery.remote_state == Delivery.REJECTED:
-                self.handler.on_transaction_declare_failed(event)
-            else:
-                log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
-                self.handler.on_transaction_declare_failed(event)
-        elif event.delivery == self._discharge:
-            if event.delivery.remote_state == Delivery.REJECTED:
-                if not self.failed:
-                    self.handler.on_transaction_commit_failed(event)
-                    self._release_pending()  # make this optional?
-            else:
-                if self.failed:
-                    self.handler.on_transaction_aborted(event)
-                    self._release_pending()
-                else:
-                    self.handler.on_transaction_committed(event)
-            self._clear_pending()
-
-
-class LinkOption(object):
-    """
-    Abstract interface for link configuration options
-    """
-
-    def apply(self, link):
-        """
-        Subclasses will implement any configuration logic in this
-        method
-        """
-        pass
-
-    def test(self, link):
-        """
-        Subclasses can override this to selectively apply an option
-        e.g. based on some link criteria
-        """
-        return True
-
-
-class AtMostOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_SETTLED
-
-
-class AtLeastOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_UNSETTLED
-        link.rcv_settle_mode = Link.RCV_FIRST
-
-
-class SenderOption(LinkOption):
-    def apply(self, sender): pass
-
-    def test(self, link): return link.is_sender
-
-
-class ReceiverOption(LinkOption):
-    def apply(self, receiver): pass
-
-    def test(self, link): return link.is_receiver
-
-
-class DynamicNodeProperties(LinkOption):
-    def __init__(self, props={}):
-        self.properties = {}
-        for k in props:
-            if isinstance(k, symbol):
-                self.properties[k] = props[k]
-            else:
-                self.properties[symbol(k)] = props[k]
-
-    def apply(self, link):
-        if link.is_receiver:
-            link.source.properties.put_dict(self.properties)
-        else:
-            link.target.properties.put_dict(self.properties)
-
-
-class Filter(ReceiverOption):
-    def __init__(self, filter_set={}):
-        self.filter_set = filter_set
-
-    def apply(self, receiver):
-        receiver.source.filter.put_dict(self.filter_set)
-
-
-class Selector(Filter):
-    """
-    Configures a link with a message selector filter
-    """
-
-    def __init__(self, value, name='selector'):
-        super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
-
-
-class DurableSubscription(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.durability = Terminus.DELIVERIES
-        receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
-
-
-class Move(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
-
-
-class Copy(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
-
-
-def _apply_link_options(options, link):
-    if options:
-        if isinstance(options, list):
-            for o in options:
-                if o.test(link): o.apply(link)
-        else:
-            if options.test(link): options.apply(link)
-
-
-def _create_session(connection, handler=None):
-    session = connection.session()
-    session.open()
-    return session
-
-
-def _get_attr(target, name):
-    if hasattr(target, name):
-        return getattr(target, name)
-    else:
-        return None
-
-
-class SessionPerConnection(object):
-    def __init__(self):
-        self._default_session = None
-
-    def session(self, connection):
-        if not self._default_session:
-            self._default_session = _create_session(connection)
-        return self._default_session
-
-
-class GlobalOverrides(object):
-    """
-    Internal handler that triggers the necessary socket connect for an
-    opened connection.
-    """
-
-    def __init__(self, base):
-        self.base = base
-
-    def on_unhandled(self, name, event):
-        if not self._override(event):
-            event.dispatch(self.base)
-
-    def _override(self, event):
-        conn = event.connection
-        return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
-
-
-class Connector(Handler):
-    """
-    Internal handler that triggers the necessary socket connect for an
-    opened connection.
-    """
-
-    def __init__(self, connection):
-        self.connection = connection
-        self.address = None
-        self.heartbeat = None
-        self.reconnect = None
-        self.ssl_domain = None
-        self.allow_insecure_mechs = True
-        self.allowed_mechs = None
-        self.sasl_enabled = True
-        self.user = None
-        self.password = None
-        self.virtual_host = None
-        self.ssl_sni = None
-        self.max_frame_size = None
-
-    def _connect(self, connection, reactor):
-        assert (reactor is not None)
-        url = self.address.next()
-        reactor.set_connection_host(connection, url.host, str(url.port))
-        # if virtual-host not set, use host from address as default
-        if self.virtual_host is None:
-            connection.hostname = url.host
-        log.debug("connecting to %r..." % url)
-
-        transport = Transport()
-        if self.sasl_enabled:
-            sasl = transport.sasl()
-            sasl.allow_insecure_mechs = self.allow_insecure_mechs
-            if url.username:
-                connection.user = url.username
-            elif self.user:
-                connection.user = self.user
-            if url.password:
-                connection.password = url.password
-            elif self.password:
-                connection.password = self.password
-            if self.allowed_mechs:
-                sasl.allowed_mechs(self.allowed_mechs)
-        transport.bind(connection)
-        if self.heartbeat:
-            transport.idle_timeout = self.heartbeat
-        if url.scheme == 'amqps':
-            if not self.ssl_domain:
-                raise SSLUnavailable("amqps: SSL libraries not found")
-            self.ssl = SSL(transport, self.ssl_domain)
-            self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
-        if self.max_frame_size:
-            transport.max_frame_size = self.max_frame_size
-
-    def on_connection_local_open(self, event):
-        self._connect(event.connection, event.reactor)
-
-    def on_connection_remote_open(self, event):
-        log.debug("connected to %s" % event.connection.hostname)
-        if self.reconnect:
-            self.reconnect.reset()
-            self.transport = None
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        if self.connection is None: return
-        if self.connection.state & Endpoint.LOCAL_ACTIVE:
-            if self.reconnect:
-                event.transport.unbind()
-                delay = self.reconnect.next()
-                if delay == 0:
-                    log.info("Disconnected, reconnecting...")
-                    self._connect(self.connection, event.reactor)
-                    return
-                else:
-                    log.info("Disconnected will try to reconnect after %s seconds" % delay)
-                    event.reactor.schedule(delay, self)
-                    return
-            else:
-                log.debug("Disconnected")
-        # See connector.cpp: conn.free()/pn_connection_release() here?
-        self.connection = None
-
-    def on_timer_task(self, event):
-        self._connect(self.connection, event.reactor)
-
-
-class Backoff(object):
-    """
-    A reconnect strategy involving an increasing delay between
-    retries, up to a maximum or 10 seconds.
-    """
-
-    def __init__(self):
-        self.delay = 0
-
-    def reset(self):
-        self.delay = 0
-
-    def next(self):
-        current = self.delay
-        if current == 0:
-            self.delay = 0.1
-        else:
-            self.delay = min(10, 2 * current)
-        return current
-
-
-class Urls(object):
-    def __init__(self, values):
-        self.values = [Url(v) for v in values]
-        self.i = iter(self.values)
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        try:
-            return next(self.i)
-        except StopIteration:
-            self.i = iter(self.values)
-            return next(self.i)
-
-
-class SSLConfig(object):
-    def __init__(self):
-        self.client = SSLDomain(SSLDomain.MODE_CLIENT)
-        self.server = SSLDomain(SSLDomain.MODE_SERVER)
-
-    def set_credentials(self, cert_file, key_file, password):
-        self.client.set_credentials(cert_file, key_file, password)
-        self.server.set_credentials(cert_file, key_file, password)
-
-    def set_trusted_ca_db(self, certificate_db):
-        self.client.set_trusted_ca_db(certificate_db)
-        self.server.set_trusted_ca_db(certificate_db)
-
-
-class Container(Reactor):
-    """A representation of the AMQP concept of a 'container', which
-       loosely speaking is something that establishes links to or from
-       another container, over which messages are transfered. This is
-       an extension to the Reactor class that adds convenience methods
-       for creating connections and sender- or receiver- links.
-    """
-
-    def __init__(self, *handlers, **kwargs):
-        super(Container, self).__init__(*handlers, **kwargs)
-        if "impl" not in kwargs:
-            try:
-                self.ssl = SSLConfig()
-            except SSLUnavailable:
-                self.ssl = None
-            self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
-            self.trigger = None
-            self.container_id = str(generate_uuid())
-            self.allow_insecure_mechs = True
-            self.allowed_mechs = None
-            self.sasl_enabled = True
-            self.user = None
-            self.password = None
-            Wrapper.__setattr__(self, 'subclass', self.__class__)
-
-    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
-                **kwargs):
-        """
-        Initiates the establishment of an AMQP connection. Returns an
-        instance of proton.Connection.
-
-        @param url: URL string of process to connect to
-
-        @param urls: list of URL strings of process to try to connect to
-
-        Only one of url or urls should be specified.
-
-        @param reconnect: Reconnect is enabled by default.  You can
-        pass in an instance of Backoff to control reconnect behavior.
-        A value of False will prevent the library from automatically
-        trying to reconnect if the underlying socket is disconnected
-        before the connection has been closed.
-
-        @param heartbeat: A value in milliseconds indicating the
-        desired frequency of heartbeats used to test the underlying
-        socket is alive.
-
-        @param ssl_domain: SSL configuration in the form of an
-        instance of proton.SSLDomain.
-
-        @param handler: a connection scoped handler that will be
-        called to process any events in the scope of this connection
-        or its child links
-
-        @param kwargs: 'sasl_enabled', which determines whether a sasl
-        layer is used for the connection; 'allowed_mechs', an optional
-        string containing a space-separated list of SASL mechanisms to
-        allow if sasl is enabled; 'allow_insecure_mechs', a flag
-        indicating whether insecure mechanisms, such as PLAIN over a
-        non-encrypted socket, are allowed; 'virtual_host', the
-        hostname to set in the Open performative used by peer to
-        determine the correct back-end service for the client. If
-        'virtual_host' is not supplied the host field from the URL is
-        used instead; 'user', the user to authenticate; 'password',
-        the authentication secret.
-
-        """
-        conn = self.connection(handler)
-        conn.container = self.container_id or str(generate_uuid())
-        conn.offered_capabilities = kwargs.get('offered_capabilities')
-        conn.desired_capabilities = kwargs.get('desired_capabilities')
-        conn.properties = kwargs.get('properties')
-
-        connector = Connector(conn)
-        connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
-        connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
-        connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
-        connector.user = kwargs.get('user', self.user)
-        connector.password = kwargs.get('password', self.password)
-        connector.virtual_host = kwargs.get('virtual_host')
-        if connector.virtual_host:
-            # only set hostname if virtual-host is a non-empty string
-            conn.hostname = connector.virtual_host
-        connector.ssl_sni = kwargs.get('sni')
-        connector.max_frame_size = kwargs.get('max_frame_size')
-
-        conn._overrides = connector
-        if url:
-            connector.address = Urls([url])
-        elif urls:
-            connector.address = Urls(urls)
-        elif address:
-            connector.address = address
-        else:
-            raise ValueError("One of url, urls or address required")
-        if heartbeat:
-            connector.heartbeat = heartbeat
-        if reconnect:
-            connector.reconnect = reconnect
-        elif reconnect is None:
-            connector.reconnect = Backoff()
-        # use container's default client domain if none specified.  This is
-        # only necessary of the URL specifies the "amqps:" scheme
-        connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
-        conn._session_policy = SessionPerConnection()  # todo: make configurable
-        conn.open()
-        return conn
-
-    def _get_id(self, container, remote, local):
-        if local and remote:
-            "%s-%s-%s" % (container, remote, local)
-        elif local:
-            return "%s-%s" % (container, local)
-        elif remote:
-            return "%s-%s" % (container, remote)
-        else:
-            return "%s-%s" % (container, str(generate_uuid()))
-
-    def _get_session(self, context):
-        if isinstance(context, Url):
-            return self._get_session(self.connect(url=context))
-        elif isinstance(context, Session):
-            return context
-        elif isinstance(context, Connection):
-            if hasattr(context, '_session_policy'):
-                return context._session_policy.session(context)
-            else:
-                return _create_session(context)
-        else:
-            return context.session()
-
-    def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
-        """
-        Initiates the establishment of a link over which messages can
-        be sent. Returns an instance of proton.Sender.
-
-        There are two patterns of use. (1) A connection can be passed
-        as the first argument, in which case the link is established
-        on that connection. In this case the target address can be
-        specified as the second argument (or as a keyword
-        argument). The source address can also be specified if
-        desired. (2) Alternatively a URL can be passed as the first
-        argument. In this case a new connection will be established on
-        which the link will be attached. If a path is specified and
-        the target is not, then the path of the URL is used as the
-        target address.
-
-        The name of the link may be specified if desired, otherwise a
-        unique name will be generated.
-
-        Various LinkOptions can be specified to further control the
-        attachment.
-        """
-        if isstring(context):
-            context = Url(context)
-        if isinstance(context, Url) and not target:
-            target = context.path
-        session = self._get_session(context)
-        snd = session.sender(name or self._get_id(session.connection.container, target, source))
-        if source:
-            snd.source.address = source
-        if target:
-            snd.target.address = target
-        if handler != None:
-            snd.handler = handler
-        if tags:
-            snd.tag_generator = tags
-        _apply_link_options(options, snd)
-        snd.open()
-        return snd
-
-    def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
-        """
-        Initiates the establishment of a link over which messages can
-        be received (aka a subscription). Returns an instance of
-        proton.Receiver.
-
-        There are two patterns of use. (1) A connection can be passed
-        as the first argument, in which case the link is established
-        on that connection. In this case the source address can be
-        specified as the second argument (or as a keyword
-        argument). The target address can also be specified if
-        desired. (2) Alternatively a URL can be passed as the first
-        argument. In this case a new connection will be established on
-        which the link will be attached. If a path is specified and
-        the source is not, then the path of the URL is used as the
-        target address.
-
-        The name of the link may be specified if desired, otherwise a
-        unique name will be generated.
-
-        Various LinkOptions can be specified to further control the
-        attachment.
-        """
-        if isstring(context):
-            context = Url(context)
-        if isinstance(context, Url) and not source:
-            source = context.path
-        session = self._get_session(context)
-        rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
-        if source:
-            rcv.source.address = source
-        if dynamic:
-            rcv.source.dynamic = True
-        if target:
-            rcv.target.address = target
-        if handler != None:
-            rcv.handler = handler
-        _apply_link_options(options, rcv)
-        rcv.open()
-        return rcv
-
-    def declare_transaction(self, context, handler=None, settle_before_discharge=False):
-        if not _get_attr(context, '_txn_ctrl'):
-            class InternalTransactionHandler(OutgoingMessageHandler):
-                def __init__(self):
-                    super(InternalTransactionHandler, self).__init__(auto_settle=True)
-
-                def on_settled(self, event):
-                    if hasattr(event.delivery, "transaction"):
-                        event.transaction = event.delivery.transaction
-                        event.delivery.transaction.handle_outcome(event)
-
-                def on_unhandled(self, method, event):
-                    if handler:
-                        event.dispatch(handler)
-
-            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
-            context._txn_ctrl.target.type = Terminus.COORDINATOR
-            context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
-        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
-
-    def listen(self, url, ssl_domain=None):
-        """
-        Initiates a server socket, accepting incoming AMQP connections
-        on the interface and port specified.
-        """
-        url = Url(url)
-        acceptor = self.acceptor(url.host, url.port)
-        ssl_config = ssl_domain
-        if not ssl_config and url.scheme == 'amqps':
-            # use container's default server domain
-            if self.ssl:
-                ssl_config = self.ssl.server
-            else:
-                raise SSLUnavailable("amqps: SSL libraries not found")
-        if ssl_config:
-            acceptor.set_ssl_domain(ssl_config)
-        return acceptor
-
-    def do_work(self, timeout=None):
-        if timeout:
-            self.timeout = timeout
-        return self.process()
+from ._reactor import Container, ApplicationEvent, EventInjector, Handler,\
+    AtLeastOnce, AtMostOnce, DynamicNodeProperties, Filter, Selector, DurableSubscription, Copy, Move,\
+    Reactor
+
+__all__ = [
+    'Container',
+    'ApplicationEvent',
+    'EventInjector',
+    'Handler',
+    'AtLeastOnce',
+    'AtMostOnce',
+    'DynamicNodeProperties',
+    'Filter',
+    'Selector',
+    'DurableSubscription',
+    'Copy',
+    'Move',
+    'Reactor'
+]

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/python/proton/utils.py b/python/proton/utils.py
index fd8f57a..270e534 100644
--- a/python/proton/utils.py
+++ b/python/proton/utils.py
@@ -19,403 +19,12 @@
 
 from __future__ import absolute_import
 
-import collections
-import time
-import threading
-
-from cproton import pn_reactor_collector, pn_collector_release
-
-from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
-from ._delivery import Delivery
-from ._endpoints import Endpoint, Link
-from ._events import Handler
-from ._url import Url
-
-from .reactor import Container
-from .handlers import MessagingHandler, IncomingMessageHandler
-
-
-class BlockingLink(object):
-    def __init__(self, connection, link):
-        self.connection = connection
-        self.link = link
-        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
-                             msg="Opening link %s" % link.name)
-        self._checkClosed()
-
-    def _waitForClose(self, timeout=1):
-        try:
-            self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
-                                 timeout=timeout,
-                                 msg="Opening link %s" % self.link.name)
-        except Timeout as e:
-            pass
-        self._checkClosed()
-
-    def _checkClosed(self):
-        if self.link.state & Endpoint.REMOTE_CLOSED:
-            self.link.close()
-            if not self.connection.closing:
-                raise LinkDetached(self.link)
-
-    def close(self):
-        self.link.close()
-        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
-                             msg="Closing link %s" % self.link.name)
-
-    # Access to other link attributes.
-    def __getattr__(self, name):
-        return getattr(self.link, name)
-
-
-class SendException(ProtonException):
-    """
-    Exception used to indicate an exceptional state/condition on a send request
-    """
-
-    def __init__(self, state):
-        self.state = state
-
-
-def _is_settled(delivery):
-    return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
-
-
-class BlockingSender(BlockingLink):
-    def __init__(self, connection, sender):
-        super(BlockingSender, self).__init__(connection, sender)
-        if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
-            # this may be followed by a detach, which may contain an error condition, so wait a little...
-            self._waitForClose()
-            # ...but close ourselves if peer does not
-            self.link.close()
-            raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
-
-    def send(self, msg, timeout=False, error_states=None):
-        delivery = self.link.send(msg)
-        self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
-                             timeout=timeout)
-        if delivery.link.snd_settle_mode != Link.SND_SETTLED:
-            delivery.settle()
-        bad = error_states
-        if bad is None:
-            bad = [Delivery.REJECTED, Delivery.RELEASED]
-        if delivery.remote_state in bad:
-            raise SendException(delivery.remote_state)
-        return delivery
-
-
-class Fetcher(MessagingHandler):
-    def __init__(self, connection, prefetch):
-        super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
-        self.connection = connection
-        self.incoming = collections.deque([])
-        self.unsettled = collections.deque([])
-
-    def on_message(self, event):
-        self.incoming.append((event.message, event.delivery))
-        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.
-
-    def on_link_error(self, event):
-        if event.link.state & Endpoint.LOCAL_ACTIVE:
-            event.link.close()
-            if not self.connection.closing:
-                raise LinkDetached(event.link)
-
-    def on_connection_error(self, event):
-        if not self.connection.closing:
-            raise ConnectionClosed(event.connection)
-
-    @property
-    def has_message(self):
-        return len(self.incoming)
-
-    def pop(self):
-        message, delivery = self.incoming.popleft()
-        if not delivery.settled:
-            self.unsettled.append(delivery)
-        return message
-
-    def settle(self, state=None):
-        delivery = self.unsettled.popleft()
-        if state:
-            delivery.update(state)
-        delivery.settle()
-
-
-class BlockingReceiver(BlockingLink):
-    def __init__(self, connection, receiver, fetcher, credit=1):
-        super(BlockingReceiver, self).__init__(connection, receiver)
-        if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
-            # this may be followed by a detach, which may contain an error condition, so wait a little...
-            self._waitForClose()
-            # ...but close ourselves if peer does not
-            self.link.close()
-            raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
-        if credit: receiver.flow(credit)
-        self.fetcher = fetcher
-        self.container = connection.container
-
-    def __del__(self):
-        self.fetcher = None
-        # The next line causes a core dump if the Proton-C reactor finalizes
-        # first.  The self.container reference prevents out of order reactor
-        # finalization. It may not be set if exception in BlockingLink.__init__
-        if hasattr(self, "container"):
-            self.link.handler = None  # implicit call to reactor
-
-    def receive(self, timeout=False):
-        if not self.fetcher:
-            raise Exception("Can't call receive on this receiver as a handler was provided")
-        if not self.link.credit:
-            self.link.flow(1)
-        self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
-                             timeout=timeout)
-        return self.fetcher.pop()
-
-    def accept(self):
-        self.settle(Delivery.ACCEPTED)
-
-    def reject(self):
-        self.settle(Delivery.REJECTED)
-
-    def release(self, delivered=True):
-        if delivered:
-            self.settle(Delivery.MODIFIED)
-        else:
-            self.settle(Delivery.RELEASED)
-
-    def settle(self, state=None):
-        if not self.fetcher:
-            raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
-        self.fetcher.settle(state)
-
-
-class LinkDetached(LinkException):
-    def __init__(self, link):
-        self.link = link
-        if link.is_sender:
-            txt = "sender %s to %s closed" % (link.name, link.target.address)
-        else:
-            txt = "receiver %s from %s closed" % (link.name, link.source.address)
-        if link.remote_condition:
-            txt += " due to: %s" % link.remote_condition
-            self.condition = link.remote_condition.name
-        else:
-            txt += " by peer"
-            self.condition = None
-        super(LinkDetached, self).__init__(txt)
-
-
-class ConnectionClosed(ConnectionException):
-    def __init__(self, connection):
-        self.connection = connection
-        txt = "Connection %s closed" % connection.hostname
-        if connection.remote_condition:
-            txt += " due to: %s" % connection.remote_condition
-            self.condition = connection.remote_condition.name
-        else:
-            txt += " by peer"
-            self.condition = None
-        super(ConnectionClosed, self).__init__(txt)
-
-
-class BlockingConnection(Handler):
-    """
-    A synchronous style connection wrapper.
-
-    This object's implementation uses OS resources.  To ensure they
-    are released when the object is no longer in use, make sure that
-    object operations are enclosed in a try block and that close() is
-    always executed on exit.
-    """
-
-    def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
-        self.disconnected = False
-        self.timeout = timeout or 60
-        self.container = container or Container()
-        self.container.timeout = self.timeout
-        self.container.start()
-        self.url = Url(url).defaults()
-        self.conn = None
-        self.closing = False
-        failed = True
-        try:
-            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
-                                               heartbeat=heartbeat, **kwargs)
-            self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
-                      msg="Opening connection")
-            failed = False
-        finally:
-            if failed and self.conn:
-                self.close()
-
-    def create_sender(self, address, handler=None, name=None, options=None):
-        return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
-                                                                 options=options))
-
-    def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
-        prefetch = credit
-        if handler:
-            fetcher = None
-            if prefetch is None:
-                prefetch = 1
-        else:
-            fetcher = Fetcher(self, credit)
-        return BlockingReceiver(
-            self,
-            self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
-                                           options=options), fetcher, credit=prefetch)
-
-    def close(self):
-        # TODO: provide stronger interrupt protection on cleanup.  See PEP 419
-        if self.closing:
-            return
-        self.closing = True
-        self.container.errors = []
-        try:
-            if self.conn:
-                self.conn.close()
-                self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
-                          msg="Closing connection")
-        finally:
-            self.conn.free()
-            # Nothing left to block on.  Allow reactor to clean up.
-            self.run()
-            self.conn = None
-            self.container.global_handler = None  # break circular ref: container to cadapter.on_error
-            pn_collector_release(pn_reactor_collector(self.container._impl))  # straggling event may keep reactor alive
-            self.container = None
-
-    def _is_closed(self):
-        return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
-
-    def run(self):
-        """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
-        while self.container.process(): pass
-        self.container.stop()
-        self.container.process()
-
-    def wait(self, condition, timeout=False, msg=None):
-        """Call process until condition() is true"""
-        if timeout is False:
-            timeout = self.timeout
-        if timeout is None:
-            while not condition() and not self.disconnected:
-                self.container.process()
-        else:
-            container_timeout = self.container.timeout
-            self.container.timeout = timeout
-            try:
-                deadline = time.time() + timeout
-                while not condition() and not self.disconnected:
-                    self.container.process()
-                    if deadline < time.time():
-                        txt = "Connection %s timed out" % self.url
-                        if msg: txt += ": " + msg
-                        raise Timeout(txt)
-            finally:
-                self.container.timeout = container_timeout
-        if self.disconnected or self._is_closed():
-            self.container.stop()
-            self.conn.handler = None  # break cyclical reference
-        if self.disconnected and not self._is_closed():
-            raise ConnectionException(
-                "Connection %s disconnected: %s" % (self.url, self.disconnected))
-
-    def on_link_remote_close(self, event):
-        if event.link.state & Endpoint.LOCAL_ACTIVE:
-            event.link.close()
-            if not self.closing:
-                raise LinkDetached(event.link)
-
-    def on_connection_remote_close(self, event):
-        if event.connection.state & Endpoint.LOCAL_ACTIVE:
-            event.connection.close()
-            if not self.closing:
-                raise ConnectionClosed(event.connection)
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_head_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        self.disconnected = event.transport.condition or "unknown"
-
-
-class AtomicCount(object):
-    def __init__(self, start=0, step=1):
-        """Thread-safe atomic counter. Start at start, increment by step."""
-        self.count, self.step = start, step
-        self.lock = threading.Lock()
-
-    def next(self):
-        """Get the next value"""
-        self.lock.acquire()
-        self.count += self.step;
-        result = self.count
-        self.lock.release()
-        return result
-
-
-class SyncRequestResponse(IncomingMessageHandler):
-    """
-    Implementation of the synchronous request-response (aka RPC) pattern.
-    @ivar address: Address for all requests, may be None.
-    @ivar connection: Connection for requests and responses.
-    """
-
-    correlation_id = AtomicCount()
-
-    def __init__(self, connection, address=None):
-        """
-        Send requests and receive responses. A single instance can send many requests
-        to the same or different addresses.
-
-        @param connection: A L{BlockingConnection}
-        @param address: Address for all requests.
-            If not specified, each request must have the address property set.
-            Successive messages may have different addresses.
-        """
-        super(SyncRequestResponse, self).__init__()
-        self.connection = connection
-        self.address = address
-        self.sender = self.connection.create_sender(self.address)
-        # dynamic=true generates a unique address dynamically for this receiver.
-        # credit=1 because we want to receive 1 response message initially.
-        self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
-        self.response = None
-
-    def call(self, request):
-        """
-        Send a request message, wait for and return the response message.
-
-        @param request: A L{proton.Message}. If L{self.address} is not set the 
-            L{self.address} must be set and will be used.
-        """
-        if not self.address and not request.address:
-            raise ValueError("Request message has no address: %s" % request)
-        request.reply_to = self.reply_to
-        request.correlation_id = correlation_id = str(self.correlation_id.next())
-        self.sender.send(request)
-
-        def wakeup():
-            return self.response and (self.response.correlation_id == correlation_id)
-
-        self.connection.wait(wakeup, msg="Waiting for response")
-        response = self.response
-        self.response = None  # Ready for next response.
-        self.receiver.flow(1)  # Set up credit for the next response.
-        return response
-
-    @property
-    def reply_to(self):
-        """Return the dynamic address of our receiver."""
-        return self.receiver.remote_source.address
-
-    def on_message(self, event):
-        """Called when we receive a message for our receiver."""
-        self.response = event.message
-        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.
+from ._utils import BlockingConnection, SyncRequestResponse, SendException, LinkDetached, ConnectionClosed
+
+__all__ = [
+    'BlockingConnection',
+    'SyncRequestResponse',
+    'SendException',
+    'LinkDetached',
+    'ConnectionClosed'
+]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org