You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/08/28 21:39:07 UTC

[7/7] qpid-proton git commit: PROTON-1922: [Python] Restrict exported symbols from proton submodules - Restricted symbols exported by proton.reactor, proton.handlers, proton.utils - All symbols used by tests and examples are exported - Other symbols that

PROTON-1922: [Python] Restrict exported symbols from proton submodules
- Restricted symbols exported by proton.reactor, proton.handlers, proton.utils
- All symbols used by tests and examples are exported
- Other symbols that seem to make sense to export are also exported


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5a566808
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5a566808
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5a566808

Branch: refs/heads/master
Commit: 5a56680848ab3165082ce2d280a4b109054c1b7e
Parents: c886daa
Author: Andrew Stitcher <as...@apache.org>
Authored: Tue Aug 28 17:33:59 2018 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Aug 28 17:33:59 2018 -0400

----------------------------------------------------------------------
 python/proton/_endpoints.py |   8 +-
 python/proton/_handlers.py  | 745 +++++++++++++++++++++++++++++
 python/proton/_reactor.py   | 982 +++++++++++++++++++++++++++++++++++++++
 python/proton/_utils.py     | 421 +++++++++++++++++
 python/proton/handlers.py   | 743 +----------------------------
 python/proton/reactor.py    | 980 +-------------------------------------
 python/proton/utils.py      | 409 +---------------
 7 files changed, 2199 insertions(+), 2089 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_endpoints.py
----------------------------------------------------------------------
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index bfa9880..bf72727 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -97,9 +97,9 @@ class Endpoint(object):
         assert False, "Subclass must override this!"
 
     def _get_handler(self):
-        from . import reactor
+        from . import _reactor
         from . import _reactor_impl
-        ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+        ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
         if ractor:
             on_error = ractor.on_error_delegate()
         else:
@@ -108,9 +108,9 @@ class Endpoint(object):
         return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
 
     def _set_handler(self, handler):
-        from . import reactor
+        from . import _reactor
         from . import _reactor_impl
-        ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+        ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
         if ractor:
             on_error = ractor.on_error_delegate()
         else:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
new file mode 100644
index 0000000..8b798f8
--- /dev/null
+++ b/python/proton/_handlers.py
@@ -0,0 +1,745 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import time
+import weakref
+from select import select
+
+from ._delivery import Delivery
+from ._endpoints import Endpoint
+from ._message import Message
+from ._exceptions import ProtonException
+from ._events import Handler, _dispatch
+
+log = logging.getLogger("proton")
+
+
+class OutgoingMessageHandler(Handler):
+    """
+    A utility for simpler and more intuitive handling of delivery
+    events related to outgoing i.e. sent messages.
+    """
+
+    def __init__(self, auto_settle=True, delegate=None):
+        self.auto_settle = auto_settle
+        self.delegate = delegate
+
+    def on_link_flow(self, event):
+        if event.link.is_sender and event.link.credit \
+                and event.link.state & Endpoint.LOCAL_ACTIVE \
+                and event.link.state & Endpoint.REMOTE_ACTIVE:
+            self.on_sendable(event)
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.link.is_sender and dlv.updated:
+            if dlv.remote_state == Delivery.ACCEPTED:
+                self.on_accepted(event)
+            elif dlv.remote_state == Delivery.REJECTED:
+                self.on_rejected(event)
+            elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
+                self.on_released(event)
+            if dlv.settled:
+                self.on_settled(event)
+            if self.auto_settle:
+                dlv.settle()
+
+    def on_sendable(self, event):
+        """
+        Called when the sender link has credit and messages can
+        therefore be transferred.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_sendable', event)
+
+    def on_accepted(self, event):
+        """
+        Called when the remote peer accepts an outgoing message.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_accepted', event)
+
+    def on_rejected(self, event):
+        """
+        Called when the remote peer rejects an outgoing message.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_rejected', event)
+
+    def on_released(self, event):
+        """
+        Called when the remote peer releases an outgoing message. Note
+        that this may be in response to either the RELEASE or MODIFIED
+        state as defined by the AMQP specification.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_released', event)
+
+    def on_settled(self, event):
+        """
+        Called when the remote peer has settled the outgoing
+        message. This is the point at which it should never be
+        retransmitted.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_settled', event)
+
+
+def recv_msg(delivery):
+    msg = Message()
+    msg.decode(delivery.link.recv(delivery.pending))
+    delivery.link.advance()
+    return msg
+
+
+class Reject(ProtonException):
+    """
+    An exception that indicate a message should be rejected
+    """
+    pass
+
+
+class Release(ProtonException):
+    """
+    An exception that indicate a message should be rejected
+    """
+    pass
+
+
+class Acking(object):
+    def accept(self, delivery):
+        """
+        Accepts a received message.
+
+        Note that this method cannot currently be used in combination
+        with transactions.
+        """
+        self.settle(delivery, Delivery.ACCEPTED)
+
+    def reject(self, delivery):
+        """
+        Rejects a received message that is considered invalid or
+        unprocessable.
+        """
+        self.settle(delivery, Delivery.REJECTED)
+
+    def release(self, delivery, delivered=True):
+        """
+        Releases a received message, making it available at the source
+        for any (other) interested receiver. The ``delivered``
+        parameter indicates whether this should be considered a
+        delivery attempt (and the delivery count updated) or not.
+        """
+        if delivered:
+            self.settle(delivery, Delivery.MODIFIED)
+        else:
+            self.settle(delivery, Delivery.RELEASED)
+
+    def settle(self, delivery, state=None):
+        if state:
+            delivery.update(state)
+        delivery.settle()
+
+
+class IncomingMessageHandler(Handler, Acking):
+    """
+    A utility for simpler and more intuitive handling of delivery
+    events related to incoming i.e. received messages.
+    """
+
+    def __init__(self, auto_accept=True, delegate=None):
+        self.delegate = delegate
+        self.auto_accept = auto_accept
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if not dlv.link.is_receiver: return
+        if dlv.aborted:
+            self.on_aborted(event)
+            dlv.settle()
+        elif dlv.readable and not dlv.partial:
+            event.message = recv_msg(dlv)
+            if event.link.state & Endpoint.LOCAL_CLOSED:
+                if self.auto_accept:
+                    dlv.update(Delivery.RELEASED)
+                    dlv.settle()
+            else:
+                try:
+                    self.on_message(event)
+                    if self.auto_accept:
+                        dlv.update(Delivery.ACCEPTED)
+                        dlv.settle()
+                except Reject:
+                    dlv.update(Delivery.REJECTED)
+                    dlv.settle()
+                except Release:
+                    dlv.update(Delivery.MODIFIED)
+                    dlv.settle()
+        elif dlv.updated and dlv.settled:
+            self.on_settled(event)
+
+    def on_message(self, event):
+        """
+        Called when a message is received. The message itself can be
+        obtained as a property on the event. For the purpose of
+        referring to this message in further actions (e.g. if
+        explicitly accepting it, the ``delivery`` should be used, also
+        obtainable via a property on the event.
+        """
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_message', event)
+
+    def on_settled(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_settled', event)
+
+    def on_aborted(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_aborted', event)
+
+
+class EndpointStateHandler(Handler):
+    """
+    A utility that exposes 'endpoint' events i.e. the open/close for
+    links, sessions and connections in a more intuitive manner. A
+    XXX_opened method will be called when both local and remote peers
+    have opened the link, session or connection. This can be used to
+    confirm a locally initiated action for example. A XXX_opening
+    method will be called when the remote peer has requested an open
+    that was not initiated locally. By default this will simply open
+    locally, which then triggers the XXX_opened call. The same applies
+    to close.
+    """
+
+    def __init__(self, peer_close_is_error=False, delegate=None):
+        self.delegate = delegate
+        self.peer_close_is_error = peer_close_is_error
+
+    @classmethod
+    def is_local_open(cls, endpoint):
+        return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+    @classmethod
+    def is_local_uninitialised(cls, endpoint):
+        return endpoint.state & Endpoint.LOCAL_UNINIT
+
+    @classmethod
+    def is_local_closed(cls, endpoint):
+        return endpoint.state & Endpoint.LOCAL_CLOSED
+
+    @classmethod
+    def is_remote_open(cls, endpoint):
+        return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+    @classmethod
+    def is_remote_closed(cls, endpoint):
+        return endpoint.state & Endpoint.REMOTE_CLOSED
+
+    @classmethod
+    def print_error(cls, endpoint, endpoint_type):
+        if endpoint.remote_condition:
+            log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
+        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
+            log.error("%s closed by peer" % endpoint_type)
+
+    def on_link_remote_close(self, event):
+        if event.link.remote_condition:
+            self.on_link_error(event)
+        elif self.is_local_closed(event.link):
+            self.on_link_closed(event)
+        else:
+            self.on_link_closing(event)
+        event.link.close()
+
+    def on_session_remote_close(self, event):
+        if event.session.remote_condition:
+            self.on_session_error(event)
+        elif self.is_local_closed(event.session):
+            self.on_session_closed(event)
+        else:
+            self.on_session_closing(event)
+        event.session.close()
+
+    def on_connection_remote_close(self, event):
+        if event.connection.remote_condition:
+            if event.connection.remote_condition.name == "amqp:connection:forced":
+                # Treat this the same as just having the transport closed by the peer without
+                # sending any events. Allow reconnection to happen transparently.
+                return
+            self.on_connection_error(event)
+        elif self.is_local_closed(event.connection):
+            self.on_connection_closed(event)
+        else:
+            self.on_connection_closing(event)
+        event.connection.close()
+
+    def on_connection_local_open(self, event):
+        if self.is_remote_open(event.connection):
+            self.on_connection_opened(event)
+
+    def on_connection_remote_open(self, event):
+        if self.is_local_open(event.connection):
+            self.on_connection_opened(event)
+        elif self.is_local_uninitialised(event.connection):
+            self.on_connection_opening(event)
+            event.connection.open()
+
+    def on_session_local_open(self, event):
+        if self.is_remote_open(event.session):
+            self.on_session_opened(event)
+
+    def on_session_remote_open(self, event):
+        if self.is_local_open(event.session):
+            self.on_session_opened(event)
+        elif self.is_local_uninitialised(event.session):
+            self.on_session_opening(event)
+            event.session.open()
+
+    def on_link_local_open(self, event):
+        if self.is_remote_open(event.link):
+            self.on_link_opened(event)
+
+    def on_link_remote_open(self, event):
+        if self.is_local_open(event.link):
+            self.on_link_opened(event)
+        elif self.is_local_uninitialised(event.link):
+            self.on_link_opening(event)
+            event.link.open()
+
+    def on_connection_opened(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_connection_opened', event)
+
+    def on_session_opened(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_session_opened', event)
+
+    def on_link_opened(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_link_opened', event)
+
+    def on_connection_opening(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_connection_opening', event)
+
+    def on_session_opening(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_session_opening', event)
+
+    def on_link_opening(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_link_opening', event)
+
+    def on_connection_error(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_connection_error', event)
+        else:
+            self.log_error(event.connection, "connection")
+
+    def on_session_error(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_session_error', event)
+        else:
+            self.log_error(event.session, "session")
+            event.connection.close()
+
+    def on_link_error(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_link_error', event)
+        else:
+            self.log_error(event.link, "link")
+            event.connection.close()
+
+    def on_connection_closed(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_connection_closed', event)
+
+    def on_session_closed(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_session_closed', event)
+
+    def on_link_closed(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_link_closed', event)
+
+    def on_connection_closing(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_connection_closing', event)
+        elif self.peer_close_is_error:
+            self.on_connection_error(event)
+
+    def on_session_closing(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_session_closing', event)
+        elif self.peer_close_is_error:
+            self.on_session_error(event)
+
+    def on_link_closing(self, event):
+        if self.delegate is not None:
+            _dispatch(self.delegate, 'on_link_closing', event)
+        elif self.peer_close_is_error:
+            self.on_link_error(event)
+
+    def on_transport_tail_closed(self, event):
+        self.on_transport_closed(event)
+
+    def on_transport_closed(self, event):
+        if self.delegate is not None and event.connection and self.is_local_open(event.connection):
+            _dispatch(self.delegate, 'on_disconnected', event)
+
+
+class MessagingHandler(Handler, Acking):
+    """
+    A general purpose handler that makes the proton-c events somewhat
+    simpler to deal with and/or avoids repetitive tasks for common use
+    cases.
+    """
+
+    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
+        self.handlers = []
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
+        self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
+        self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
+        self.fatal_conditions = ["amqp:unauthorized-access"]
+
+    def on_transport_error(self, event):
+        """
+        Called when some error is encountered with the transport over
+        which the AMQP connection is to be established. This includes
+        authentication errors as well as socket errors.
+        """
+        if event.transport.condition:
+            if event.transport.condition.info:
+                log.error("%s: %s: %s" % (
+                    event.transport.condition.name, event.transport.condition.description,
+                    event.transport.condition.info))
+            else:
+                log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
+            if event.transport.condition.name in self.fatal_conditions:
+                event.connection.close()
+        else:
+            logging.error("Unspecified transport error")
+
+    def on_connection_error(self, event):
+        """
+        Called when the peer closes the connection with an error condition.
+        """
+        EndpointStateHandler.print_error(event.connection, "connection")
+
+    def on_session_error(self, event):
+        """
+        Called when the peer closes the session with an error condition.
+        """
+        EndpointStateHandler.print_error(event.session, "session")
+        event.connection.close()
+
+    def on_link_error(self, event):
+        """
+        Called when the peer closes the link with an error condition.
+        """
+        EndpointStateHandler.print_error(event.link, "link")
+        event.connection.close()
+
+    def on_reactor_init(self, event):
+        """
+        Called when the event loop - the reactor - starts.
+        """
+        if hasattr(event.reactor, 'subclass'):
+            setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
+        self.on_start(event)
+
+    def on_start(self, event):
+        """
+        Called when the event loop starts. (Just an alias for on_reactor_init)
+        """
+        pass
+
+    def on_connection_closed(self, event):
+        """
+        Called when the connection is closed.
+        """
+        pass
+
+    def on_session_closed(self, event):
+        """
+        Called when the session is closed.
+        """
+        pass
+
+    def on_link_closed(self, event):
+        """
+        Called when the link is closed.
+        """
+        pass
+
+    def on_connection_closing(self, event):
+        """
+        Called when the peer initiates the closing of the connection.
+        """
+        pass
+
+    def on_session_closing(self, event):
+        """
+        Called when the peer initiates the closing of the session.
+        """
+        pass
+
+    def on_link_closing(self, event):
+        """
+        Called when the peer initiates the closing of the link.
+        """
+        pass
+
+    def on_disconnected(self, event):
+        """
+        Called when the socket is disconnected.
+        """
+        pass
+
+    def on_sendable(self, event):
+        """
+        Called when the sender link has credit and messages can
+        therefore be transferred.
+        """
+        pass
+
+    def on_accepted(self, event):
+        """
+        Called when the remote peer accepts an outgoing message.
+        """
+        pass
+
+    def on_rejected(self, event):
+        """
+        Called when the remote peer rejects an outgoing message.
+        """
+        pass
+
+    def on_released(self, event):
+        """
+        Called when the remote peer releases an outgoing message. Note
+        that this may be in response to either the RELEASE or MODIFIED
+        state as defined by the AMQP specification.
+        """
+        pass
+
+    def on_settled(self, event):
+        """
+        Called when the remote peer has settled the outgoing
+        message. This is the point at which it should never be
+        retransmitted.
+        """
+        pass
+
+    def on_message(self, event):
+        """
+        Called when a message is received. The message itself can be
+        obtained as a property on the event. For the purpose of
+        referring to this message in further actions (e.g. if
+        explicitly accepting it, the ``delivery`` should be used, also
+        obtainable via a property on the event.
+        """
+        pass
+
+
+class TransactionHandler(object):
+    """
+    The interface for transaction handlers, i.e. objects that want to
+    be notified of state changes related to a transaction.
+    """
+
+    def on_transaction_declared(self, event):
+        pass
+
+    def on_transaction_committed(self, event):
+        pass
+
+    def on_transaction_aborted(self, event):
+        pass
+
+    def on_transaction_declare_failed(self, event):
+        pass
+
+    def on_transaction_commit_failed(self, event):
+        pass
+
+
+class TransactionalClientHandler(MessagingHandler, TransactionHandler):
+    """
+    An extension to the MessagingHandler for applications using
+    transactions.
+    """
+
+    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
+        super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
+
+    def accept(self, delivery, transaction=None):
+        if transaction:
+            transaction.accept(delivery)
+        else:
+            super(TransactionalClientHandler, self).accept(delivery)
+
+
+class FlowController(Handler):
+    def __init__(self, window=1024):
+        self._window = window
+        self._drained = 0
+
+    def on_link_local_open(self, event):
+        self._flow(event.link)
+
+    def on_link_remote_open(self, event):
+        self._flow(event.link)
+
+    def on_link_flow(self, event):
+        self._flow(event.link)
+
+    def on_delivery(self, event):
+        self._flow(event.link)
+
+    def _flow(self, link):
+        if link.is_receiver:
+            self._drained += link.drained()
+            if self._drained == 0:
+                delta = self._window - link.credit
+                link.flow(delta)
+
+
+class Handshaker(Handler):
+
+    @staticmethod
+    def on_connection_remote_open(event):
+        conn = event.connection
+        if conn.state & Endpoint.LOCAL_UNINIT:
+            conn.open()
+
+    @staticmethod
+    def on_session_remote_open(event):
+        ssn = event.session
+        if ssn.state() & Endpoint.LOCAL_UNINIT:
+            ssn.open()
+
+    @staticmethod
+    def on_link_remote_open(event):
+        link = event.link
+        if link.state & Endpoint.LOCAL_UNINIT:
+            link.source.copy(link.remote_source)
+            link.target.copy(link.remote_target)
+            link.open()
+
+    @staticmethod
+    def on_connection_remote_close(event):
+        conn = event.connection
+        if not conn.state & Endpoint.LOCAL_CLOSED:
+            conn.close()
+
+    @staticmethod
+    def on_session_remote_close(event):
+        ssn = event.session
+        if not ssn.state & Endpoint.LOCAL_CLOSED:
+            ssn.close()
+
+    @staticmethod
+    def on_link_remote_close(event):
+        link = event.link
+        if not link.state & Endpoint.LOCAL_CLOSED:
+            link.close()
+
+
+# Back compatibility definitions
+CFlowController = FlowController
+CHandshaker = Handshaker
+
+
+from ._events import WrappedHandler
+from cproton import pn_iohandler
+
+class IOHandler(WrappedHandler):
+
+    def __init__(self):
+        WrappedHandler.__init__(self, pn_iohandler)
+
+
+class PythonIO:
+
+    def __init__(self):
+        self.selectables = []
+        self.delegate = IOHandler()
+
+    def on_unhandled(self, method, event):
+        event.dispatch(self.delegate)
+
+    def on_selectable_init(self, event):
+        self.selectables.append(event.context)
+
+    def on_selectable_updated(self, event):
+        pass
+
+    def on_selectable_final(self, event):
+        sel = event.context
+        if sel.is_terminal:
+            self.selectables.remove(sel)
+            sel.release()
+
+    def on_reactor_quiesced(self, event):
+        reactor = event.reactor
+        # check if we are still quiesced, other handlers of
+        # on_reactor_quiesced could have produced events to process
+        if not reactor.quiesced: return
+
+        reading = []
+        writing = []
+        deadline = None
+        for sel in self.selectables:
+            if sel.reading:
+                reading.append(sel)
+            if sel.writing:
+                writing.append(sel)
+            if sel.deadline:
+                if deadline is None:
+                    deadline = sel.deadline
+                else:
+                    deadline = min(sel.deadline, deadline)
+
+        if deadline is not None:
+            timeout = deadline - time.time()
+        else:
+            timeout = reactor.timeout
+        if (timeout < 0): timeout = 0
+        timeout = min(timeout, reactor.timeout)
+        readable, writable, _ = select(reading, writing, [], timeout)
+
+        reactor.mark()
+
+        now = time.time()
+
+        for s in readable:
+            s.readable()
+        for s in writable:
+            s.writable()
+        for s in self.selectables:
+            if s.deadline and now > s.deadline:
+                s.expired()
+
+        reactor.yield_()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
new file mode 100644
index 0000000..4548e59
--- /dev/null
+++ b/python/proton/_reactor.py
@@ -0,0 +1,982 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import os
+import logging
+import traceback
+import uuid
+
+from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
+    pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
+    pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
+    pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
+    pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
+    pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
+    pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
+    pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
+    pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
+
+from ._delivery import  Delivery
+from ._endpoints import Connection, Endpoint, Link, Session, Terminus
+from ._data import Described, symbol, ulong
+from ._message import  Message
+from ._transport import Transport, SSL, SSLDomain, SSLUnavailable
+from ._url import Url
+from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._events import EventType, EventBase, Handler
+from ._reactor_impl import Selectable, WrappedHandler, _chandler
+from ._wrapper import Wrapper, PYCTX
+
+from ._handlers import OutgoingMessageHandler
+
+from . import _compat
+from ._compat import queue
+
+log = logging.getLogger("proton")
+
+
+def generate_uuid():
+    return uuid.uuid4()
+
+
+def _timeout2millis(secs):
+    if secs is None: return PN_MILLIS_MAX
+    return secs2millis(secs)
+
+
+def _millis2timeout(millis):
+    if millis == PN_MILLIS_MAX: return None
+    return millis2secs(millis)
+
+
+class Task(Wrapper):
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            return Task(impl)
+
+    def __init__(self, impl):
+        Wrapper.__init__(self, impl, pn_task_attachments)
+
+    def _init(self):
+        pass
+
+    def cancel(self):
+        pn_task_cancel(self._impl)
+
+
+class Acceptor(Wrapper):
+
+    def __init__(self, impl):
+        Wrapper.__init__(self, impl)
+
+    def set_ssl_domain(self, ssl_domain):
+        pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
+
+    def close(self):
+        pn_acceptor_close(self._impl)
+
+
+class Reactor(Wrapper):
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            record = pn_reactor_attachments(impl)
+            attrs = pn_void2py(pn_record_get(record, PYCTX))
+            if attrs and 'subclass' in attrs:
+                return attrs['subclass'](impl=impl)
+            else:
+                return Reactor(impl=impl)
+
+    def __init__(self, *handlers, **kwargs):
+        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
+        for h in handlers:
+            self.handler.add(h, on_error=self.on_error_delegate())
+
+    def _init(self):
+        self.errors = []
+
+    # on_error relay handler tied to underlying C reactor.  Use when the
+    # error will always be generated from a callback from this reactor.
+    # Needed to prevent reference cycles and be compatible with wrappers.
+    class ErrorDelegate(object):
+        def __init__(self, reactor):
+            self.reactor_impl = reactor._impl
+
+        def on_error(self, info):
+            ractor = Reactor.wrap(self.reactor_impl)
+            ractor.on_error(info)
+
+    def on_error_delegate(self):
+        return Reactor.ErrorDelegate(self).on_error
+
+    def on_error(self, info):
+        self.errors.append(info)
+        self.yield_()
+
+    def _get_global(self):
+        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
+
+    def _set_global(self, handler):
+        impl = _chandler(handler, self.on_error_delegate())
+        pn_reactor_set_global_handler(self._impl, impl)
+        pn_decref(impl)
+
+    global_handler = property(_get_global, _set_global)
+
+    def _get_timeout(self):
+        return _millis2timeout(pn_reactor_get_timeout(self._impl))
+
+    def _set_timeout(self, secs):
+        return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
+
+    timeout = property(_get_timeout, _set_timeout)
+
+    def yield_(self):
+        pn_reactor_yield(self._impl)
+
+    def mark(self):
+        return pn_reactor_mark(self._impl)
+
+    def _get_handler(self):
+        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
+
+    def _set_handler(self, handler):
+        impl = _chandler(handler, self.on_error_delegate())
+        pn_reactor_set_handler(self._impl, impl)
+        pn_decref(impl)
+
+    handler = property(_get_handler, _set_handler)
+
+    def run(self):
+        self.timeout = 3.14159265359
+        self.start()
+        while self.process(): pass
+        self.stop()
+        self.process()
+        self.global_handler = None
+        self.handler = None
+
+    def wakeup(self):
+        n = pn_reactor_wakeup(self._impl)
+        if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
+
+    def start(self):
+        pn_reactor_start(self._impl)
+
+    @property
+    def quiesced(self):
+        return pn_reactor_quiesced(self._impl)
+
+    def _check_errors(self):
+        if self.errors:
+            for exc, value, tb in self.errors[:-1]:
+                traceback.print_exception(exc, value, tb)
+            exc, value, tb = self.errors[-1]
+            _compat.raise_(exc, value, tb)
+
+    def process(self):
+        result = pn_reactor_process(self._impl)
+        self._check_errors()
+        return result
+
+    def stop(self):
+        pn_reactor_stop(self._impl)
+        self._check_errors()
+
+    def schedule(self, delay, task):
+        impl = _chandler(task, self.on_error_delegate())
+        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
+        pn_decref(impl)
+        return task
+
+    def acceptor(self, host, port, handler=None):
+        impl = _chandler(handler, self.on_error_delegate())
+        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
+        pn_decref(impl)
+        if aimpl:
+            return Acceptor(aimpl)
+        else:
+            raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
+
+    def connection(self, handler=None):
+        """Deprecated: use connection_to_host() instead
+        """
+        impl = _chandler(handler, self.on_error_delegate())
+        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
+        if impl: pn_decref(impl)
+        return result
+
+    def connection_to_host(self, host, port, handler=None):
+        """Create an outgoing Connection that will be managed by the reactor.
+        The reactor's pn_iohandler will create a socket connection to the host
+        once the connection is opened.
+        """
+        conn = self.connection(handler)
+        self.set_connection_host(conn, host, port)
+        return conn
+
+    def set_connection_host(self, connection, host, port):
+        """Change the address used by the connection.  The address is
+        used by the reactor's iohandler to create an outgoing socket
+        connection.  This must be set prior to opening the connection.
+        """
+        pn_reactor_set_connection_host(self._impl,
+                                       connection._impl,
+                                       unicode2utf8(str(host)),
+                                       unicode2utf8(str(port)))
+
+    def get_connection_address(self, connection):
+        """This may be used to retrieve the remote peer address.
+        @return: string containing the address in URL format or None if no
+        address is available.  Use the proton.Url class to create a Url object
+        from the returned value.
+        """
+        _url = pn_reactor_get_connection_address(self._impl, connection._impl)
+        return utf82unicode(_url)
+
+    def selectable(self, handler=None):
+        impl = _chandler(handler, self.on_error_delegate())
+        result = Selectable.wrap(pn_reactor_selectable(self._impl))
+        if impl:
+            record = pn_selectable_attachments(result._impl)
+            pn_record_set_handler(record, impl)
+            pn_decref(impl)
+        return result
+
+    def update(self, sel):
+        pn_reactor_update(self._impl, sel._impl)
+
+    def push_event(self, obj, etype):
+        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
+
+
+from ._events import wrappers as _wrappers
+
+_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
+_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
+
+
+class EventInjector(object):
+    """
+    Can be added to a reactor to allow events to be triggered by an
+    external thread but handled on the event thread associated with
+    the reactor. An instance of this class can be passed to the
+    Reactor.selectable() method of the reactor in order to activate
+    it. The close() method should be called when it is no longer
+    needed, to allow the event loop to end if needed.
+    """
+
+    def __init__(self):
+        self.queue = queue.Queue()
+        self.pipe = os.pipe()
+        self._closed = False
+
+    def trigger(self, event):
+        """
+        Request that the given event be dispatched on the event thread
+        of the reactor to which this EventInjector was added.
+        """
+        self.queue.put(event)
+        os.write(self.pipe[1], b"!")
+
+    def close(self):
+        """
+        Request that this EventInjector be closed. Existing events
+        will be dispatched on the reactors event dispatch thread,
+        then this will be removed from the set of interest.
+        """
+        self._closed = True
+        os.write(self.pipe[1], b"!")
+
+    def fileno(self):
+        return self.pipe[0]
+
+    def on_selectable_init(self, event):
+        sel = event.context
+        sel.fileno(self.fileno())
+        sel.reading = True
+        event.reactor.update(sel)
+
+    def on_selectable_readable(self, event):
+        os.read(self.pipe[0], 512)
+        while not self.queue.empty():
+            requested = self.queue.get()
+            event.reactor.push_event(requested.context, requested.type)
+        if self._closed:
+            s = event.context
+            s.terminate()
+            event.reactor.update(s)
+
+
+class ApplicationEvent(EventBase):
+    """
+    Application defined event, which can optionally be associated with
+    an engine object and or an arbitrary subject
+    """
+
+    def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+        super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
+        self.connection = connection
+        self.session = session
+        self.link = link
+        self.delivery = delivery
+        if self.delivery:
+            self.link = self.delivery.link
+        if self.link:
+            self.session = self.link.session
+        if self.session:
+            self.connection = self.session.connection
+        self.subject = subject
+
+    def __repr__(self):
+        objects = [self.connection, self.session, self.link, self.delivery, self.subject]
+        return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
+
+
+class Transaction(object):
+    """
+    Class to track state of an AMQP 1.0 transaction.
+    """
+
+    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
+        self.txn_ctrl = txn_ctrl
+        self.handler = handler
+        self.id = None
+        self._declare = None
+        self._discharge = None
+        self.failed = False
+        self._pending = []
+        self.settle_before_discharge = settle_before_discharge
+        self.declare()
+
+    def commit(self):
+        self.discharge(False)
+
+    def abort(self):
+        self.discharge(True)
+
+    def declare(self):
+        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
+
+    def discharge(self, failed):
+        self.failed = failed
+        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
+
+    def _send_ctrl(self, descriptor, value):
+        delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
+        delivery.transaction = self
+        return delivery
+
+    def send(self, sender, msg, tag=None):
+        dlv = sender.send(msg, tag=tag)
+        dlv.local.data = [self.id]
+        dlv.update(0x34)
+        return dlv
+
+    def accept(self, delivery):
+        self.update(delivery, PN_ACCEPTED)
+        if self.settle_before_discharge:
+            delivery.settle()
+        else:
+            self._pending.append(delivery)
+
+    def update(self, delivery, state=None):
+        if state:
+            delivery.local.data = [self.id, Described(ulong(state), [])]
+            delivery.update(0x34)
+
+    def _release_pending(self):
+        for d in self._pending:
+            d.update(Delivery.RELEASED)
+            d.settle()
+        self._clear_pending()
+
+    def _clear_pending(self):
+        self._pending = []
+
+    def handle_outcome(self, event):
+        if event.delivery == self._declare:
+            if event.delivery.remote.data:
+                self.id = event.delivery.remote.data[0]
+                self.handler.on_transaction_declared(event)
+            elif event.delivery.remote_state == Delivery.REJECTED:
+                self.handler.on_transaction_declare_failed(event)
+            else:
+                log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
+                self.handler.on_transaction_declare_failed(event)
+        elif event.delivery == self._discharge:
+            if event.delivery.remote_state == Delivery.REJECTED:
+                if not self.failed:
+                    self.handler.on_transaction_commit_failed(event)
+                    self._release_pending()  # make this optional?
+            else:
+                if self.failed:
+                    self.handler.on_transaction_aborted(event)
+                    self._release_pending()
+                else:
+                    self.handler.on_transaction_committed(event)
+            self._clear_pending()
+
+
+class LinkOption(object):
+    """
+    Abstract interface for link configuration options
+    """
+
+    def apply(self, link):
+        """
+        Subclasses will implement any configuration logic in this
+        method
+        """
+        pass
+
+    def test(self, link):
+        """
+        Subclasses can override this to selectively apply an option
+        e.g. based on some link criteria
+        """
+        return True
+
+
+class AtMostOnce(LinkOption):
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_SETTLED
+
+
+class AtLeastOnce(LinkOption):
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_UNSETTLED
+        link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class SenderOption(LinkOption):
+    def apply(self, sender): pass
+
+    def test(self, link): return link.is_sender
+
+
+class ReceiverOption(LinkOption):
+    def apply(self, receiver): pass
+
+    def test(self, link): return link.is_receiver
+
+
+class DynamicNodeProperties(LinkOption):
+    def __init__(self, props={}):
+        self.properties = {}
+        for k in props:
+            if isinstance(k, symbol):
+                self.properties[k] = props[k]
+            else:
+                self.properties[symbol(k)] = props[k]
+
+    def apply(self, link):
+        if link.is_receiver:
+            link.source.properties.put_dict(self.properties)
+        else:
+            link.target.properties.put_dict(self.properties)
+
+
+class Filter(ReceiverOption):
+    def __init__(self, filter_set={}):
+        self.filter_set = filter_set
+
+    def apply(self, receiver):
+        receiver.source.filter.put_dict(self.filter_set)
+
+
+class Selector(Filter):
+    """
+    Configures a link with a message selector filter
+    """
+
+    def __init__(self, value, name='selector'):
+        super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
+
+
+class DurableSubscription(ReceiverOption):
+    def apply(self, receiver):
+        receiver.source.durability = Terminus.DELIVERIES
+        receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
+
+
+class Move(ReceiverOption):
+    def apply(self, receiver):
+        receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
+
+
+class Copy(ReceiverOption):
+    def apply(self, receiver):
+        receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
+
+
+def _apply_link_options(options, link):
+    if options:
+        if isinstance(options, list):
+            for o in options:
+                if o.test(link): o.apply(link)
+        else:
+            if options.test(link): options.apply(link)
+
+
+def _create_session(connection, handler=None):
+    session = connection.session()
+    session.open()
+    return session
+
+
+def _get_attr(target, name):
+    if hasattr(target, name):
+        return getattr(target, name)
+    else:
+        return None
+
+
+class SessionPerConnection(object):
+    def __init__(self):
+        self._default_session = None
+
+    def session(self, connection):
+        if not self._default_session:
+            self._default_session = _create_session(connection)
+        return self._default_session
+
+
+class GlobalOverrides(object):
+    """
+    Internal handler that triggers the necessary socket connect for an
+    opened connection.
+    """
+
+    def __init__(self, base):
+        self.base = base
+
+    def on_unhandled(self, name, event):
+        if not self._override(event):
+            event.dispatch(self.base)
+
+    def _override(self, event):
+        conn = event.connection
+        return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
+
+
+class Connector(Handler):
+    """
+    Internal handler that triggers the necessary socket connect for an
+    opened connection.
+    """
+
+    def __init__(self, connection):
+        self.connection = connection
+        self.address = None
+        self.heartbeat = None
+        self.reconnect = None
+        self.ssl_domain = None
+        self.allow_insecure_mechs = True
+        self.allowed_mechs = None
+        self.sasl_enabled = True
+        self.user = None
+        self.password = None
+        self.virtual_host = None
+        self.ssl_sni = None
+        self.max_frame_size = None
+
+    def _connect(self, connection, reactor):
+        assert (reactor is not None)
+        url = self.address.next()
+        reactor.set_connection_host(connection, url.host, str(url.port))
+        # if virtual-host not set, use host from address as default
+        if self.virtual_host is None:
+            connection.hostname = url.host
+        log.debug("connecting to %r..." % url)
+
+        transport = Transport()
+        if self.sasl_enabled:
+            sasl = transport.sasl()
+            sasl.allow_insecure_mechs = self.allow_insecure_mechs
+            if url.username:
+                connection.user = url.username
+            elif self.user:
+                connection.user = self.user
+            if url.password:
+                connection.password = url.password
+            elif self.password:
+                connection.password = self.password
+            if self.allowed_mechs:
+                sasl.allowed_mechs(self.allowed_mechs)
+        transport.bind(connection)
+        if self.heartbeat:
+            transport.idle_timeout = self.heartbeat
+        if url.scheme == 'amqps':
+            if not self.ssl_domain:
+                raise SSLUnavailable("amqps: SSL libraries not found")
+            self.ssl = SSL(transport, self.ssl_domain)
+            self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
+        if self.max_frame_size:
+            transport.max_frame_size = self.max_frame_size
+
+    def on_connection_local_open(self, event):
+        self._connect(event.connection, event.reactor)
+
+    def on_connection_remote_open(self, event):
+        log.debug("connected to %s" % event.connection.hostname)
+        if self.reconnect:
+            self.reconnect.reset()
+            self.transport = None
+
+    def on_transport_tail_closed(self, event):
+        self.on_transport_closed(event)
+
+    def on_transport_closed(self, event):
+        if self.connection is None: return
+        if self.connection.state & Endpoint.LOCAL_ACTIVE:
+            if self.reconnect:
+                event.transport.unbind()
+                delay = self.reconnect.next()
+                if delay == 0:
+                    log.info("Disconnected, reconnecting...")
+                    self._connect(self.connection, event.reactor)
+                    return
+                else:
+                    log.info("Disconnected will try to reconnect after %s seconds" % delay)
+                    event.reactor.schedule(delay, self)
+                    return
+            else:
+                log.debug("Disconnected")
+        # See connector.cpp: conn.free()/pn_connection_release() here?
+        self.connection = None
+
+    def on_timer_task(self, event):
+        self._connect(self.connection, event.reactor)
+
+
+class Backoff(object):
+    """
+    A reconnect strategy involving an increasing delay between
+    retries, up to a maximum or 10 seconds.
+    """
+
+    def __init__(self):
+        self.delay = 0
+
+    def reset(self):
+        self.delay = 0
+
+    def next(self):
+        current = self.delay
+        if current == 0:
+            self.delay = 0.1
+        else:
+            self.delay = min(10, 2 * current)
+        return current
+
+
+class Urls(object):
+    def __init__(self, values):
+        self.values = [Url(v) for v in values]
+        self.i = iter(self.values)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        try:
+            return next(self.i)
+        except StopIteration:
+            self.i = iter(self.values)
+            return next(self.i)
+
+
+class SSLConfig(object):
+    def __init__(self):
+        self.client = SSLDomain(SSLDomain.MODE_CLIENT)
+        self.server = SSLDomain(SSLDomain.MODE_SERVER)
+
+    def set_credentials(self, cert_file, key_file, password):
+        self.client.set_credentials(cert_file, key_file, password)
+        self.server.set_credentials(cert_file, key_file, password)
+
+    def set_trusted_ca_db(self, certificate_db):
+        self.client.set_trusted_ca_db(certificate_db)
+        self.server.set_trusted_ca_db(certificate_db)
+
+
+class Container(Reactor):
+    """A representation of the AMQP concept of a 'container', which
+       loosely speaking is something that establishes links to or from
+       another container, over which messages are transfered. This is
+       an extension to the Reactor class that adds convenience methods
+       for creating connections and sender- or receiver- links.
+    """
+
+    def __init__(self, *handlers, **kwargs):
+        super(Container, self).__init__(*handlers, **kwargs)
+        if "impl" not in kwargs:
+            try:
+                self.ssl = SSLConfig()
+            except SSLUnavailable:
+                self.ssl = None
+            self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
+            self.trigger = None
+            self.container_id = str(generate_uuid())
+            self.allow_insecure_mechs = True
+            self.allowed_mechs = None
+            self.sasl_enabled = True
+            self.user = None
+            self.password = None
+            Wrapper.__setattr__(self, 'subclass', self.__class__)
+
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
+                **kwargs):
+        """
+        Initiates the establishment of an AMQP connection. Returns an
+        instance of proton.Connection.
+
+        @param url: URL string of process to connect to
+
+        @param urls: list of URL strings of process to try to connect to
+
+        Only one of url or urls should be specified.
+
+        @param reconnect: Reconnect is enabled by default.  You can
+        pass in an instance of Backoff to control reconnect behavior.
+        A value of False will prevent the library from automatically
+        trying to reconnect if the underlying socket is disconnected
+        before the connection has been closed.
+
+        @param heartbeat: A value in milliseconds indicating the
+        desired frequency of heartbeats used to test the underlying
+        socket is alive.
+
+        @param ssl_domain: SSL configuration in the form of an
+        instance of proton.SSLDomain.
+
+        @param handler: a connection scoped handler that will be
+        called to process any events in the scope of this connection
+        or its child links
+
+        @param kwargs: 'sasl_enabled', which determines whether a sasl
+        layer is used for the connection; 'allowed_mechs', an optional
+        string containing a space-separated list of SASL mechanisms to
+        allow if sasl is enabled; 'allow_insecure_mechs', a flag
+        indicating whether insecure mechanisms, such as PLAIN over a
+        non-encrypted socket, are allowed; 'virtual_host', the
+        hostname to set in the Open performative used by peer to
+        determine the correct back-end service for the client. If
+        'virtual_host' is not supplied the host field from the URL is
+        used instead; 'user', the user to authenticate; 'password',
+        the authentication secret.
+
+        """
+        conn = self.connection(handler)
+        conn.container = self.container_id or str(generate_uuid())
+        conn.offered_capabilities = kwargs.get('offered_capabilities')
+        conn.desired_capabilities = kwargs.get('desired_capabilities')
+        conn.properties = kwargs.get('properties')
+
+        connector = Connector(conn)
+        connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
+        connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
+        connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
+        connector.user = kwargs.get('user', self.user)
+        connector.password = kwargs.get('password', self.password)
+        connector.virtual_host = kwargs.get('virtual_host')
+        if connector.virtual_host:
+            # only set hostname if virtual-host is a non-empty string
+            conn.hostname = connector.virtual_host
+        connector.ssl_sni = kwargs.get('sni')
+        connector.max_frame_size = kwargs.get('max_frame_size')
+
+        conn._overrides = connector
+        if url:
+            connector.address = Urls([url])
+        elif urls:
+            connector.address = Urls(urls)
+        elif address:
+            connector.address = address
+        else:
+            raise ValueError("One of url, urls or address required")
+        if heartbeat:
+            connector.heartbeat = heartbeat
+        if reconnect:
+            connector.reconnect = reconnect
+        elif reconnect is None:
+            connector.reconnect = Backoff()
+        # use container's default client domain if none specified.  This is
+        # only necessary of the URL specifies the "amqps:" scheme
+        connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
+        conn._session_policy = SessionPerConnection()  # todo: make configurable
+        conn.open()
+        return conn
+
+    def _get_id(self, container, remote, local):
+        if local and remote:
+            "%s-%s-%s" % (container, remote, local)
+        elif local:
+            return "%s-%s" % (container, local)
+        elif remote:
+            return "%s-%s" % (container, remote)
+        else:
+            return "%s-%s" % (container, str(generate_uuid()))
+
+    def _get_session(self, context):
+        if isinstance(context, Url):
+            return self._get_session(self.connect(url=context))
+        elif isinstance(context, Session):
+            return context
+        elif isinstance(context, Connection):
+            if hasattr(context, '_session_policy'):
+                return context._session_policy.session(context)
+            else:
+                return _create_session(context)
+        else:
+            return context.session()
+
+    def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
+        """
+        Initiates the establishment of a link over which messages can
+        be sent. Returns an instance of proton.Sender.
+
+        There are two patterns of use. (1) A connection can be passed
+        as the first argument, in which case the link is established
+        on that connection. In this case the target address can be
+        specified as the second argument (or as a keyword
+        argument). The source address can also be specified if
+        desired. (2) Alternatively a URL can be passed as the first
+        argument. In this case a new connection will be established on
+        which the link will be attached. If a path is specified and
+        the target is not, then the path of the URL is used as the
+        target address.
+
+        The name of the link may be specified if desired, otherwise a
+        unique name will be generated.
+
+        Various LinkOptions can be specified to further control the
+        attachment.
+        """
+        if isstring(context):
+            context = Url(context)
+        if isinstance(context, Url) and not target:
+            target = context.path
+        session = self._get_session(context)
+        snd = session.sender(name or self._get_id(session.connection.container, target, source))
+        if source:
+            snd.source.address = source
+        if target:
+            snd.target.address = target
+        if handler != None:
+            snd.handler = handler
+        if tags:
+            snd.tag_generator = tags
+        _apply_link_options(options, snd)
+        snd.open()
+        return snd
+
+    def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
+        """
+        Initiates the establishment of a link over which messages can
+        be received (aka a subscription). Returns an instance of
+        proton.Receiver.
+
+        There are two patterns of use. (1) A connection can be passed
+        as the first argument, in which case the link is established
+        on that connection. In this case the source address can be
+        specified as the second argument (or as a keyword
+        argument). The target address can also be specified if
+        desired. (2) Alternatively a URL can be passed as the first
+        argument. In this case a new connection will be established on
+        which the link will be attached. If a path is specified and
+        the source is not, then the path of the URL is used as the
+        target address.
+
+        The name of the link may be specified if desired, otherwise a
+        unique name will be generated.
+
+        Various LinkOptions can be specified to further control the
+        attachment.
+        """
+        if isstring(context):
+            context = Url(context)
+        if isinstance(context, Url) and not source:
+            source = context.path
+        session = self._get_session(context)
+        rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
+        if source:
+            rcv.source.address = source
+        if dynamic:
+            rcv.source.dynamic = True
+        if target:
+            rcv.target.address = target
+        if handler != None:
+            rcv.handler = handler
+        _apply_link_options(options, rcv)
+        rcv.open()
+        return rcv
+
+    def declare_transaction(self, context, handler=None, settle_before_discharge=False):
+        if not _get_attr(context, '_txn_ctrl'):
+            class InternalTransactionHandler(OutgoingMessageHandler):
+                def __init__(self):
+                    super(InternalTransactionHandler, self).__init__(auto_settle=True)
+
+                def on_settled(self, event):
+                    if hasattr(event.delivery, "transaction"):
+                        event.transaction = event.delivery.transaction
+                        event.delivery.transaction.handle_outcome(event)
+
+                def on_unhandled(self, method, event):
+                    if handler:
+                        event.dispatch(handler)
+
+            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
+            context._txn_ctrl.target.type = Terminus.COORDINATOR
+            context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
+
+    def listen(self, url, ssl_domain=None):
+        """
+        Initiates a server socket, accepting incoming AMQP connections
+        on the interface and port specified.
+        """
+        url = Url(url)
+        acceptor = self.acceptor(url.host, url.port)
+        ssl_config = ssl_domain
+        if not ssl_config and url.scheme == 'amqps':
+            # use container's default server domain
+            if self.ssl:
+                ssl_config = self.ssl.server
+            else:
+                raise SSLUnavailable("amqps: SSL libraries not found")
+        if ssl_config:
+            acceptor.set_ssl_domain(ssl_config)
+        return acceptor
+
+    def do_work(self, timeout=None):
+        if timeout:
+            self.timeout = timeout
+        return self.process()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_utils.py
----------------------------------------------------------------------
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
new file mode 100644
index 0000000..6462b55
--- /dev/null
+++ b/python/proton/_utils.py
@@ -0,0 +1,421 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import collections
+import time
+import threading
+
+from cproton import pn_reactor_collector, pn_collector_release
+
+from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
+from ._delivery import Delivery
+from ._endpoints import Endpoint, Link
+from ._events import Handler
+from ._url import Url
+
+from ._reactor import Container
+from ._handlers import MessagingHandler, IncomingMessageHandler
+
+
+class BlockingLink(object):
+    def __init__(self, connection, link):
+        self.connection = connection
+        self.link = link
+        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+                             msg="Opening link %s" % link.name)
+        self._checkClosed()
+
+    def _waitForClose(self, timeout=1):
+        try:
+            self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
+                                 timeout=timeout,
+                                 msg="Opening link %s" % self.link.name)
+        except Timeout as e:
+            pass
+        self._checkClosed()
+
+    def _checkClosed(self):
+        if self.link.state & Endpoint.REMOTE_CLOSED:
+            self.link.close()
+            if not self.connection.closing:
+                raise LinkDetached(self.link)
+
+    def close(self):
+        self.link.close()
+        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
+                             msg="Closing link %s" % self.link.name)
+
+    # Access to other link attributes.
+    def __getattr__(self, name):
+        return getattr(self.link, name)
+
+
+class SendException(ProtonException):
+    """
+    Exception used to indicate an exceptional state/condition on a send request
+    """
+
+    def __init__(self, state):
+        self.state = state
+
+
+def _is_settled(delivery):
+    return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
+
+
+class BlockingSender(BlockingLink):
+    def __init__(self, connection, sender):
+        super(BlockingSender, self).__init__(connection, sender)
+        if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
+            # this may be followed by a detach, which may contain an error condition, so wait a little...
+            self._waitForClose()
+            # ...but close ourselves if peer does not
+            self.link.close()
+            raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
+
+    def send(self, msg, timeout=False, error_states=None):
+        delivery = self.link.send(msg)
+        self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
+                             timeout=timeout)
+        if delivery.link.snd_settle_mode != Link.SND_SETTLED:
+            delivery.settle()
+        bad = error_states
+        if bad is None:
+            bad = [Delivery.REJECTED, Delivery.RELEASED]
+        if delivery.remote_state in bad:
+            raise SendException(delivery.remote_state)
+        return delivery
+
+
+class Fetcher(MessagingHandler):
+    def __init__(self, connection, prefetch):
+        super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
+        self.connection = connection
+        self.incoming = collections.deque([])
+        self.unsettled = collections.deque([])
+
+    def on_message(self, event):
+        self.incoming.append((event.message, event.delivery))
+        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.
+
+    def on_link_error(self, event):
+        if event.link.state & Endpoint.LOCAL_ACTIVE:
+            event.link.close()
+            if not self.connection.closing:
+                raise LinkDetached(event.link)
+
+    def on_connection_error(self, event):
+        if not self.connection.closing:
+            raise ConnectionClosed(event.connection)
+
+    @property
+    def has_message(self):
+        return len(self.incoming)
+
+    def pop(self):
+        message, delivery = self.incoming.popleft()
+        if not delivery.settled:
+            self.unsettled.append(delivery)
+        return message
+
+    def settle(self, state=None):
+        delivery = self.unsettled.popleft()
+        if state:
+            delivery.update(state)
+        delivery.settle()
+
+
+class BlockingReceiver(BlockingLink):
+    def __init__(self, connection, receiver, fetcher, credit=1):
+        super(BlockingReceiver, self).__init__(connection, receiver)
+        if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
+            # this may be followed by a detach, which may contain an error condition, so wait a little...
+            self._waitForClose()
+            # ...but close ourselves if peer does not
+            self.link.close()
+            raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
+        if credit: receiver.flow(credit)
+        self.fetcher = fetcher
+        self.container = connection.container
+
+    def __del__(self):
+        self.fetcher = None
+        # The next line causes a core dump if the Proton-C reactor finalizes
+        # first.  The self.container reference prevents out of order reactor
+        # finalization. It may not be set if exception in BlockingLink.__init__
+        if hasattr(self, "container"):
+            self.link.handler = None  # implicit call to reactor
+
+    def receive(self, timeout=False):
+        if not self.fetcher:
+            raise Exception("Can't call receive on this receiver as a handler was provided")
+        if not self.link.credit:
+            self.link.flow(1)
+        self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
+                             timeout=timeout)
+        return self.fetcher.pop()
+
+    def accept(self):
+        self.settle(Delivery.ACCEPTED)
+
+    def reject(self):
+        self.settle(Delivery.REJECTED)
+
+    def release(self, delivered=True):
+        if delivered:
+            self.settle(Delivery.MODIFIED)
+        else:
+            self.settle(Delivery.RELEASED)
+
+    def settle(self, state=None):
+        if not self.fetcher:
+            raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
+        self.fetcher.settle(state)
+
+
+class LinkDetached(LinkException):
+    def __init__(self, link):
+        self.link = link
+        if link.is_sender:
+            txt = "sender %s to %s closed" % (link.name, link.target.address)
+        else:
+            txt = "receiver %s from %s closed" % (link.name, link.source.address)
+        if link.remote_condition:
+            txt += " due to: %s" % link.remote_condition
+            self.condition = link.remote_condition.name
+        else:
+            txt += " by peer"
+            self.condition = None
+        super(LinkDetached, self).__init__(txt)
+
+
+class ConnectionClosed(ConnectionException):
+    def __init__(self, connection):
+        self.connection = connection
+        txt = "Connection %s closed" % connection.hostname
+        if connection.remote_condition:
+            txt += " due to: %s" % connection.remote_condition
+            self.condition = connection.remote_condition.name
+        else:
+            txt += " by peer"
+            self.condition = None
+        super(ConnectionClosed, self).__init__(txt)
+
+
+class BlockingConnection(Handler):
+    """
+    A synchronous style connection wrapper.
+
+    This object's implementation uses OS resources.  To ensure they
+    are released when the object is no longer in use, make sure that
+    object operations are enclosed in a try block and that close() is
+    always executed on exit.
+    """
+
+    def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
+        self.disconnected = False
+        self.timeout = timeout or 60
+        self.container = container or Container()
+        self.container.timeout = self.timeout
+        self.container.start()
+        self.url = Url(url).defaults()
+        self.conn = None
+        self.closing = False
+        failed = True
+        try:
+            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
+                                               heartbeat=heartbeat, **kwargs)
+            self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+                      msg="Opening connection")
+            failed = False
+        finally:
+            if failed and self.conn:
+                self.close()
+
+    def create_sender(self, address, handler=None, name=None, options=None):
+        return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
+                                                                 options=options))
+
+    def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
+        prefetch = credit
+        if handler:
+            fetcher = None
+            if prefetch is None:
+                prefetch = 1
+        else:
+            fetcher = Fetcher(self, credit)
+        return BlockingReceiver(
+            self,
+            self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
+                                           options=options), fetcher, credit=prefetch)
+
+    def close(self):
+        # TODO: provide stronger interrupt protection on cleanup.  See PEP 419
+        if self.closing:
+            return
+        self.closing = True
+        self.container.errors = []
+        try:
+            if self.conn:
+                self.conn.close()
+                self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+                          msg="Closing connection")
+        finally:
+            self.conn.free()
+            # Nothing left to block on.  Allow reactor to clean up.
+            self.run()
+            self.conn = None
+            self.container.global_handler = None  # break circular ref: container to cadapter.on_error
+            pn_collector_release(pn_reactor_collector(self.container._impl))  # straggling event may keep reactor alive
+            self.container = None
+
+    def _is_closed(self):
+        return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
+
+    def run(self):
+        """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+        while self.container.process(): pass
+        self.container.stop()
+        self.container.process()
+
+    def wait(self, condition, timeout=False, msg=None):
+        """Call process until condition() is true"""
+        if timeout is False:
+            timeout = self.timeout
+        if timeout is None:
+            while not condition() and not self.disconnected:
+                self.container.process()
+        else:
+            container_timeout = self.container.timeout
+            self.container.timeout = timeout
+            try:
+                deadline = time.time() + timeout
+                while not condition() and not self.disconnected:
+                    self.container.process()
+                    if deadline < time.time():
+                        txt = "Connection %s timed out" % self.url
+                        if msg: txt += ": " + msg
+                        raise Timeout(txt)
+            finally:
+                self.container.timeout = container_timeout
+        if self.disconnected or self._is_closed():
+            self.container.stop()
+            self.conn.handler = None  # break cyclical reference
+        if self.disconnected and not self._is_closed():
+            raise ConnectionException(
+                "Connection %s disconnected: %s" % (self.url, self.disconnected))
+
+    def on_link_remote_close(self, event):
+        if event.link.state & Endpoint.LOCAL_ACTIVE:
+            event.link.close()
+            if not self.closing:
+                raise LinkDetached(event.link)
+
+    def on_connection_remote_close(self, event):
+        if event.connection.state & Endpoint.LOCAL_ACTIVE:
+            event.connection.close()
+            if not self.closing:
+                raise ConnectionClosed(event.connection)
+
+    def on_transport_tail_closed(self, event):
+        self.on_transport_closed(event)
+
+    def on_transport_head_closed(self, event):
+        self.on_transport_closed(event)
+
+    def on_transport_closed(self, event):
+        self.disconnected = event.transport.condition or "unknown"
+
+
+class AtomicCount(object):
+    def __init__(self, start=0, step=1):
+        """Thread-safe atomic counter. Start at start, increment by step."""
+        self.count, self.step = start, step
+        self.lock = threading.Lock()
+
+    def next(self):
+        """Get the next value"""
+        self.lock.acquire()
+        self.count += self.step;
+        result = self.count
+        self.lock.release()
+        return result
+
+
+class SyncRequestResponse(IncomingMessageHandler):
+    """
+    Implementation of the synchronous request-response (aka RPC) pattern.
+    @ivar address: Address for all requests, may be None.
+    @ivar connection: Connection for requests and responses.
+    """
+
+    correlation_id = AtomicCount()
+
+    def __init__(self, connection, address=None):
+        """
+        Send requests and receive responses. A single instance can send many requests
+        to the same or different addresses.
+
+        @param connection: A L{BlockingConnection}
+        @param address: Address for all requests.
+            If not specified, each request must have the address property set.
+            Successive messages may have different addresses.
+        """
+        super(SyncRequestResponse, self).__init__()
+        self.connection = connection
+        self.address = address
+        self.sender = self.connection.create_sender(self.address)
+        # dynamic=true generates a unique address dynamically for this receiver.
+        # credit=1 because we want to receive 1 response message initially.
+        self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
+        self.response = None
+
+    def call(self, request):
+        """
+        Send a request message, wait for and return the response message.
+
+        @param request: A L{proton.Message}. If L{self.address} is not set the 
+            L{self.address} must be set and will be used.
+        """
+        if not self.address and not request.address:
+            raise ValueError("Request message has no address: %s" % request)
+        request.reply_to = self.reply_to
+        request.correlation_id = correlation_id = str(self.correlation_id.next())
+        self.sender.send(request)
+
+        def wakeup():
+            return self.response and (self.response.correlation_id == correlation_id)
+
+        self.connection.wait(wakeup, msg="Waiting for response")
+        response = self.response
+        self.response = None  # Ready for next response.
+        self.receiver.flow(1)  # Set up credit for the next response.
+        return response
+
+    @property
+    def reply_to(self):
+        """Return the dynamic address of our receiver."""
+        return self.receiver.remote_source.address
+
+    def on_message(self, event):
+        """Called when we receive a message for our receiver."""
+        self.response = event.message
+        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org