You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/08/28 21:39:07 UTC
[7/7] qpid-proton git commit: PROTON-1922: [Python] Restrict exported
symbols from proton submodules - Restricted symbols exported by
proton.reactor, proton.handlers,
proton.utils - All symbols used by tests and examples are exported - Other
symbols that
PROTON-1922: [Python] Restrict exported symbols from proton submodules
- Restricted symbols exported by proton.reactor, proton.handlers, proton.utils
- All symbols used by tests and examples are exported
- Other symbols that seem to make sense to export are also exported
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5a566808
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5a566808
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5a566808
Branch: refs/heads/master
Commit: 5a56680848ab3165082ce2d280a4b109054c1b7e
Parents: c886daa
Author: Andrew Stitcher <as...@apache.org>
Authored: Tue Aug 28 17:33:59 2018 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Aug 28 17:33:59 2018 -0400
----------------------------------------------------------------------
python/proton/_endpoints.py | 8 +-
python/proton/_handlers.py | 745 +++++++++++++++++++++++++++++
python/proton/_reactor.py | 982 +++++++++++++++++++++++++++++++++++++++
python/proton/_utils.py | 421 +++++++++++++++++
python/proton/handlers.py | 743 +----------------------------
python/proton/reactor.py | 980 +-------------------------------------
python/proton/utils.py | 409 +---------------
7 files changed, 2199 insertions(+), 2089 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_endpoints.py
----------------------------------------------------------------------
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index bfa9880..bf72727 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -97,9 +97,9 @@ class Endpoint(object):
assert False, "Subclass must override this!"
def _get_handler(self):
- from . import reactor
+ from . import _reactor
from . import _reactor_impl
- ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+ ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error_delegate()
else:
@@ -108,9 +108,9 @@ class Endpoint(object):
return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
def _set_handler(self, handler):
- from . import reactor
+ from . import _reactor
from . import _reactor_impl
- ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+ ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
if ractor:
on_error = ractor.on_error_delegate()
else:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
new file mode 100644
index 0000000..8b798f8
--- /dev/null
+++ b/python/proton/_handlers.py
@@ -0,0 +1,745 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import time
+import weakref
+from select import select
+
+from ._delivery import Delivery
+from ._endpoints import Endpoint
+from ._message import Message
+from ._exceptions import ProtonException
+from ._events import Handler, _dispatch
+
+log = logging.getLogger("proton")
+
+
+class OutgoingMessageHandler(Handler):
+ """
+ A utility for simpler and more intuitive handling of delivery
+ events related to outgoing i.e. sent messages.
+ """
+
+ def __init__(self, auto_settle=True, delegate=None):
+ self.auto_settle = auto_settle
+ self.delegate = delegate
+
+ def on_link_flow(self, event):
+ if event.link.is_sender and event.link.credit \
+ and event.link.state & Endpoint.LOCAL_ACTIVE \
+ and event.link.state & Endpoint.REMOTE_ACTIVE:
+ self.on_sendable(event)
+
+ def on_delivery(self, event):
+ dlv = event.delivery
+ if dlv.link.is_sender and dlv.updated:
+ if dlv.remote_state == Delivery.ACCEPTED:
+ self.on_accepted(event)
+ elif dlv.remote_state == Delivery.REJECTED:
+ self.on_rejected(event)
+ elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
+ self.on_released(event)
+ if dlv.settled:
+ self.on_settled(event)
+ if self.auto_settle:
+ dlv.settle()
+
+ def on_sendable(self, event):
+ """
+ Called when the sender link has credit and messages can
+ therefore be transferred.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_sendable', event)
+
+ def on_accepted(self, event):
+ """
+ Called when the remote peer accepts an outgoing message.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_accepted', event)
+
+ def on_rejected(self, event):
+ """
+ Called when the remote peer rejects an outgoing message.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_rejected', event)
+
+ def on_released(self, event):
+ """
+ Called when the remote peer releases an outgoing message. Note
+ that this may be in response to either the RELEASE or MODIFIED
+ state as defined by the AMQP specification.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_released', event)
+
+ def on_settled(self, event):
+ """
+ Called when the remote peer has settled the outgoing
+ message. This is the point at which it should never be
+ retransmitted.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_settled', event)
+
+
+def recv_msg(delivery):
+ msg = Message()
+ msg.decode(delivery.link.recv(delivery.pending))
+ delivery.link.advance()
+ return msg
+
+
+class Reject(ProtonException):
+ """
+ An exception that indicate a message should be rejected
+ """
+ pass
+
+
+class Release(ProtonException):
+ """
+ An exception that indicate a message should be rejected
+ """
+ pass
+
+
+class Acking(object):
+ def accept(self, delivery):
+ """
+ Accepts a received message.
+
+ Note that this method cannot currently be used in combination
+ with transactions.
+ """
+ self.settle(delivery, Delivery.ACCEPTED)
+
+ def reject(self, delivery):
+ """
+ Rejects a received message that is considered invalid or
+ unprocessable.
+ """
+ self.settle(delivery, Delivery.REJECTED)
+
+ def release(self, delivery, delivered=True):
+ """
+ Releases a received message, making it available at the source
+ for any (other) interested receiver. The ``delivered``
+ parameter indicates whether this should be considered a
+ delivery attempt (and the delivery count updated) or not.
+ """
+ if delivered:
+ self.settle(delivery, Delivery.MODIFIED)
+ else:
+ self.settle(delivery, Delivery.RELEASED)
+
+ def settle(self, delivery, state=None):
+ if state:
+ delivery.update(state)
+ delivery.settle()
+
+
+class IncomingMessageHandler(Handler, Acking):
+ """
+ A utility for simpler and more intuitive handling of delivery
+ events related to incoming i.e. received messages.
+ """
+
+ def __init__(self, auto_accept=True, delegate=None):
+ self.delegate = delegate
+ self.auto_accept = auto_accept
+
+ def on_delivery(self, event):
+ dlv = event.delivery
+ if not dlv.link.is_receiver: return
+ if dlv.aborted:
+ self.on_aborted(event)
+ dlv.settle()
+ elif dlv.readable and not dlv.partial:
+ event.message = recv_msg(dlv)
+ if event.link.state & Endpoint.LOCAL_CLOSED:
+ if self.auto_accept:
+ dlv.update(Delivery.RELEASED)
+ dlv.settle()
+ else:
+ try:
+ self.on_message(event)
+ if self.auto_accept:
+ dlv.update(Delivery.ACCEPTED)
+ dlv.settle()
+ except Reject:
+ dlv.update(Delivery.REJECTED)
+ dlv.settle()
+ except Release:
+ dlv.update(Delivery.MODIFIED)
+ dlv.settle()
+ elif dlv.updated and dlv.settled:
+ self.on_settled(event)
+
+ def on_message(self, event):
+ """
+ Called when a message is received. The message itself can be
+ obtained as a property on the event. For the purpose of
+ referring to this message in further actions (e.g. if
+ explicitly accepting it, the ``delivery`` should be used, also
+ obtainable via a property on the event.
+ """
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_message', event)
+
+ def on_settled(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_settled', event)
+
+ def on_aborted(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_aborted', event)
+
+
+class EndpointStateHandler(Handler):
+ """
+ A utility that exposes 'endpoint' events i.e. the open/close for
+ links, sessions and connections in a more intuitive manner. A
+ XXX_opened method will be called when both local and remote peers
+ have opened the link, session or connection. This can be used to
+ confirm a locally initiated action for example. A XXX_opening
+ method will be called when the remote peer has requested an open
+ that was not initiated locally. By default this will simply open
+ locally, which then triggers the XXX_opened call. The same applies
+ to close.
+ """
+
+ def __init__(self, peer_close_is_error=False, delegate=None):
+ self.delegate = delegate
+ self.peer_close_is_error = peer_close_is_error
+
+ @classmethod
+ def is_local_open(cls, endpoint):
+ return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+ @classmethod
+ def is_local_uninitialised(cls, endpoint):
+ return endpoint.state & Endpoint.LOCAL_UNINIT
+
+ @classmethod
+ def is_local_closed(cls, endpoint):
+ return endpoint.state & Endpoint.LOCAL_CLOSED
+
+ @classmethod
+ def is_remote_open(cls, endpoint):
+ return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+ @classmethod
+ def is_remote_closed(cls, endpoint):
+ return endpoint.state & Endpoint.REMOTE_CLOSED
+
+ @classmethod
+ def print_error(cls, endpoint, endpoint_type):
+ if endpoint.remote_condition:
+ log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
+ elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
+ log.error("%s closed by peer" % endpoint_type)
+
+ def on_link_remote_close(self, event):
+ if event.link.remote_condition:
+ self.on_link_error(event)
+ elif self.is_local_closed(event.link):
+ self.on_link_closed(event)
+ else:
+ self.on_link_closing(event)
+ event.link.close()
+
+ def on_session_remote_close(self, event):
+ if event.session.remote_condition:
+ self.on_session_error(event)
+ elif self.is_local_closed(event.session):
+ self.on_session_closed(event)
+ else:
+ self.on_session_closing(event)
+ event.session.close()
+
+ def on_connection_remote_close(self, event):
+ if event.connection.remote_condition:
+ if event.connection.remote_condition.name == "amqp:connection:forced":
+ # Treat this the same as just having the transport closed by the peer without
+ # sending any events. Allow reconnection to happen transparently.
+ return
+ self.on_connection_error(event)
+ elif self.is_local_closed(event.connection):
+ self.on_connection_closed(event)
+ else:
+ self.on_connection_closing(event)
+ event.connection.close()
+
+ def on_connection_local_open(self, event):
+ if self.is_remote_open(event.connection):
+ self.on_connection_opened(event)
+
+ def on_connection_remote_open(self, event):
+ if self.is_local_open(event.connection):
+ self.on_connection_opened(event)
+ elif self.is_local_uninitialised(event.connection):
+ self.on_connection_opening(event)
+ event.connection.open()
+
+ def on_session_local_open(self, event):
+ if self.is_remote_open(event.session):
+ self.on_session_opened(event)
+
+ def on_session_remote_open(self, event):
+ if self.is_local_open(event.session):
+ self.on_session_opened(event)
+ elif self.is_local_uninitialised(event.session):
+ self.on_session_opening(event)
+ event.session.open()
+
+ def on_link_local_open(self, event):
+ if self.is_remote_open(event.link):
+ self.on_link_opened(event)
+
+ def on_link_remote_open(self, event):
+ if self.is_local_open(event.link):
+ self.on_link_opened(event)
+ elif self.is_local_uninitialised(event.link):
+ self.on_link_opening(event)
+ event.link.open()
+
+ def on_connection_opened(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_connection_opened', event)
+
+ def on_session_opened(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_session_opened', event)
+
+ def on_link_opened(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_link_opened', event)
+
+ def on_connection_opening(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_connection_opening', event)
+
+ def on_session_opening(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_session_opening', event)
+
+ def on_link_opening(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_link_opening', event)
+
+ def on_connection_error(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_connection_error', event)
+ else:
+ self.log_error(event.connection, "connection")
+
+ def on_session_error(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_session_error', event)
+ else:
+ self.log_error(event.session, "session")
+ event.connection.close()
+
+ def on_link_error(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_link_error', event)
+ else:
+ self.log_error(event.link, "link")
+ event.connection.close()
+
+ def on_connection_closed(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_connection_closed', event)
+
+ def on_session_closed(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_session_closed', event)
+
+ def on_link_closed(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_link_closed', event)
+
+ def on_connection_closing(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_connection_closing', event)
+ elif self.peer_close_is_error:
+ self.on_connection_error(event)
+
+ def on_session_closing(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_session_closing', event)
+ elif self.peer_close_is_error:
+ self.on_session_error(event)
+
+ def on_link_closing(self, event):
+ if self.delegate is not None:
+ _dispatch(self.delegate, 'on_link_closing', event)
+ elif self.peer_close_is_error:
+ self.on_link_error(event)
+
+ def on_transport_tail_closed(self, event):
+ self.on_transport_closed(event)
+
+ def on_transport_closed(self, event):
+ if self.delegate is not None and event.connection and self.is_local_open(event.connection):
+ _dispatch(self.delegate, 'on_disconnected', event)
+
+
+class MessagingHandler(Handler, Acking):
+ """
+ A general purpose handler that makes the proton-c events somewhat
+ simpler to deal with and/or avoids repetitive tasks for common use
+ cases.
+ """
+
+ def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
+ self.handlers = []
+ if prefetch:
+ self.handlers.append(FlowController(prefetch))
+ self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
+ self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
+ self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
+ self.fatal_conditions = ["amqp:unauthorized-access"]
+
+ def on_transport_error(self, event):
+ """
+ Called when some error is encountered with the transport over
+ which the AMQP connection is to be established. This includes
+ authentication errors as well as socket errors.
+ """
+ if event.transport.condition:
+ if event.transport.condition.info:
+ log.error("%s: %s: %s" % (
+ event.transport.condition.name, event.transport.condition.description,
+ event.transport.condition.info))
+ else:
+ log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
+ if event.transport.condition.name in self.fatal_conditions:
+ event.connection.close()
+ else:
+ logging.error("Unspecified transport error")
+
+ def on_connection_error(self, event):
+ """
+ Called when the peer closes the connection with an error condition.
+ """
+ EndpointStateHandler.print_error(event.connection, "connection")
+
+ def on_session_error(self, event):
+ """
+ Called when the peer closes the session with an error condition.
+ """
+ EndpointStateHandler.print_error(event.session, "session")
+ event.connection.close()
+
+ def on_link_error(self, event):
+ """
+ Called when the peer closes the link with an error condition.
+ """
+ EndpointStateHandler.print_error(event.link, "link")
+ event.connection.close()
+
+ def on_reactor_init(self, event):
+ """
+ Called when the event loop - the reactor - starts.
+ """
+ if hasattr(event.reactor, 'subclass'):
+ setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
+ self.on_start(event)
+
+ def on_start(self, event):
+ """
+ Called when the event loop starts. (Just an alias for on_reactor_init)
+ """
+ pass
+
+ def on_connection_closed(self, event):
+ """
+ Called when the connection is closed.
+ """
+ pass
+
+ def on_session_closed(self, event):
+ """
+ Called when the session is closed.
+ """
+ pass
+
+ def on_link_closed(self, event):
+ """
+ Called when the link is closed.
+ """
+ pass
+
+ def on_connection_closing(self, event):
+ """
+ Called when the peer initiates the closing of the connection.
+ """
+ pass
+
+ def on_session_closing(self, event):
+ """
+ Called when the peer initiates the closing of the session.
+ """
+ pass
+
+ def on_link_closing(self, event):
+ """
+ Called when the peer initiates the closing of the link.
+ """
+ pass
+
+ def on_disconnected(self, event):
+ """
+ Called when the socket is disconnected.
+ """
+ pass
+
+ def on_sendable(self, event):
+ """
+ Called when the sender link has credit and messages can
+ therefore be transferred.
+ """
+ pass
+
+ def on_accepted(self, event):
+ """
+ Called when the remote peer accepts an outgoing message.
+ """
+ pass
+
+ def on_rejected(self, event):
+ """
+ Called when the remote peer rejects an outgoing message.
+ """
+ pass
+
+ def on_released(self, event):
+ """
+ Called when the remote peer releases an outgoing message. Note
+ that this may be in response to either the RELEASE or MODIFIED
+ state as defined by the AMQP specification.
+ """
+ pass
+
+ def on_settled(self, event):
+ """
+ Called when the remote peer has settled the outgoing
+ message. This is the point at which it should never be
+ retransmitted.
+ """
+ pass
+
+ def on_message(self, event):
+ """
+ Called when a message is received. The message itself can be
+ obtained as a property on the event. For the purpose of
+ referring to this message in further actions (e.g. if
+ explicitly accepting it, the ``delivery`` should be used, also
+ obtainable via a property on the event.
+ """
+ pass
+
+
+class TransactionHandler(object):
+ """
+ The interface for transaction handlers, i.e. objects that want to
+ be notified of state changes related to a transaction.
+ """
+
+ def on_transaction_declared(self, event):
+ pass
+
+ def on_transaction_committed(self, event):
+ pass
+
+ def on_transaction_aborted(self, event):
+ pass
+
+ def on_transaction_declare_failed(self, event):
+ pass
+
+ def on_transaction_commit_failed(self, event):
+ pass
+
+
+class TransactionalClientHandler(MessagingHandler, TransactionHandler):
+ """
+ An extension to the MessagingHandler for applications using
+ transactions.
+ """
+
+ def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
+ super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
+
+ def accept(self, delivery, transaction=None):
+ if transaction:
+ transaction.accept(delivery)
+ else:
+ super(TransactionalClientHandler, self).accept(delivery)
+
+
+class FlowController(Handler):
+ def __init__(self, window=1024):
+ self._window = window
+ self._drained = 0
+
+ def on_link_local_open(self, event):
+ self._flow(event.link)
+
+ def on_link_remote_open(self, event):
+ self._flow(event.link)
+
+ def on_link_flow(self, event):
+ self._flow(event.link)
+
+ def on_delivery(self, event):
+ self._flow(event.link)
+
+ def _flow(self, link):
+ if link.is_receiver:
+ self._drained += link.drained()
+ if self._drained == 0:
+ delta = self._window - link.credit
+ link.flow(delta)
+
+
+class Handshaker(Handler):
+
+ @staticmethod
+ def on_connection_remote_open(event):
+ conn = event.connection
+ if conn.state & Endpoint.LOCAL_UNINIT:
+ conn.open()
+
+ @staticmethod
+ def on_session_remote_open(event):
+ ssn = event.session
+ if ssn.state() & Endpoint.LOCAL_UNINIT:
+ ssn.open()
+
+ @staticmethod
+ def on_link_remote_open(event):
+ link = event.link
+ if link.state & Endpoint.LOCAL_UNINIT:
+ link.source.copy(link.remote_source)
+ link.target.copy(link.remote_target)
+ link.open()
+
+ @staticmethod
+ def on_connection_remote_close(event):
+ conn = event.connection
+ if not conn.state & Endpoint.LOCAL_CLOSED:
+ conn.close()
+
+ @staticmethod
+ def on_session_remote_close(event):
+ ssn = event.session
+ if not ssn.state & Endpoint.LOCAL_CLOSED:
+ ssn.close()
+
+ @staticmethod
+ def on_link_remote_close(event):
+ link = event.link
+ if not link.state & Endpoint.LOCAL_CLOSED:
+ link.close()
+
+
+# Back compatibility definitions
+CFlowController = FlowController
+CHandshaker = Handshaker
+
+
+from ._events import WrappedHandler
+from cproton import pn_iohandler
+
+class IOHandler(WrappedHandler):
+
+ def __init__(self):
+ WrappedHandler.__init__(self, pn_iohandler)
+
+
+class PythonIO:
+
+ def __init__(self):
+ self.selectables = []
+ self.delegate = IOHandler()
+
+ def on_unhandled(self, method, event):
+ event.dispatch(self.delegate)
+
+ def on_selectable_init(self, event):
+ self.selectables.append(event.context)
+
+ def on_selectable_updated(self, event):
+ pass
+
+ def on_selectable_final(self, event):
+ sel = event.context
+ if sel.is_terminal:
+ self.selectables.remove(sel)
+ sel.release()
+
+ def on_reactor_quiesced(self, event):
+ reactor = event.reactor
+ # check if we are still quiesced, other handlers of
+ # on_reactor_quiesced could have produced events to process
+ if not reactor.quiesced: return
+
+ reading = []
+ writing = []
+ deadline = None
+ for sel in self.selectables:
+ if sel.reading:
+ reading.append(sel)
+ if sel.writing:
+ writing.append(sel)
+ if sel.deadline:
+ if deadline is None:
+ deadline = sel.deadline
+ else:
+ deadline = min(sel.deadline, deadline)
+
+ if deadline is not None:
+ timeout = deadline - time.time()
+ else:
+ timeout = reactor.timeout
+ if (timeout < 0): timeout = 0
+ timeout = min(timeout, reactor.timeout)
+ readable, writable, _ = select(reading, writing, [], timeout)
+
+ reactor.mark()
+
+ now = time.time()
+
+ for s in readable:
+ s.readable()
+ for s in writable:
+ s.writable()
+ for s in self.selectables:
+ if s.deadline and now > s.deadline:
+ s.expired()
+
+ reactor.yield_()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
new file mode 100644
index 0000000..4548e59
--- /dev/null
+++ b/python/proton/_reactor.py
@@ -0,0 +1,982 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import os
+import logging
+import traceback
+import uuid
+
+from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
+ pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
+ pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
+ pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
+ pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
+ pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
+ pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
+ pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
+ pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
+
+from ._delivery import Delivery
+from ._endpoints import Connection, Endpoint, Link, Session, Terminus
+from ._data import Described, symbol, ulong
+from ._message import Message
+from ._transport import Transport, SSL, SSLDomain, SSLUnavailable
+from ._url import Url
+from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._events import EventType, EventBase, Handler
+from ._reactor_impl import Selectable, WrappedHandler, _chandler
+from ._wrapper import Wrapper, PYCTX
+
+from ._handlers import OutgoingMessageHandler
+
+from . import _compat
+from ._compat import queue
+
+log = logging.getLogger("proton")
+
+
+def generate_uuid():
+ return uuid.uuid4()
+
+
+def _timeout2millis(secs):
+ if secs is None: return PN_MILLIS_MAX
+ return secs2millis(secs)
+
+
+def _millis2timeout(millis):
+ if millis == PN_MILLIS_MAX: return None
+ return millis2secs(millis)
+
+
+class Task(Wrapper):
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Task(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_task_attachments)
+
+ def _init(self):
+ pass
+
+ def cancel(self):
+ pn_task_cancel(self._impl)
+
+
+class Acceptor(Wrapper):
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl)
+
+ def set_ssl_domain(self, ssl_domain):
+ pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
+
+ def close(self):
+ pn_acceptor_close(self._impl)
+
+
+class Reactor(Wrapper):
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ record = pn_reactor_attachments(impl)
+ attrs = pn_void2py(pn_record_get(record, PYCTX))
+ if attrs and 'subclass' in attrs:
+ return attrs['subclass'](impl=impl)
+ else:
+ return Reactor(impl=impl)
+
+ def __init__(self, *handlers, **kwargs):
+ Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
+ for h in handlers:
+ self.handler.add(h, on_error=self.on_error_delegate())
+
+ def _init(self):
+ self.errors = []
+
+ # on_error relay handler tied to underlying C reactor. Use when the
+ # error will always be generated from a callback from this reactor.
+ # Needed to prevent reference cycles and be compatible with wrappers.
+ class ErrorDelegate(object):
+ def __init__(self, reactor):
+ self.reactor_impl = reactor._impl
+
+ def on_error(self, info):
+ ractor = Reactor.wrap(self.reactor_impl)
+ ractor.on_error(info)
+
+ def on_error_delegate(self):
+ return Reactor.ErrorDelegate(self).on_error
+
+ def on_error(self, info):
+ self.errors.append(info)
+ self.yield_()
+
+ def _get_global(self):
+ return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
+
+ def _set_global(self, handler):
+ impl = _chandler(handler, self.on_error_delegate())
+ pn_reactor_set_global_handler(self._impl, impl)
+ pn_decref(impl)
+
+ global_handler = property(_get_global, _set_global)
+
+ def _get_timeout(self):
+ return _millis2timeout(pn_reactor_get_timeout(self._impl))
+
+ def _set_timeout(self, secs):
+ return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
+
+ timeout = property(_get_timeout, _set_timeout)
+
+ def yield_(self):
+ pn_reactor_yield(self._impl)
+
+ def mark(self):
+ return pn_reactor_mark(self._impl)
+
+ def _get_handler(self):
+ return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
+
+ def _set_handler(self, handler):
+ impl = _chandler(handler, self.on_error_delegate())
+ pn_reactor_set_handler(self._impl, impl)
+ pn_decref(impl)
+
+ handler = property(_get_handler, _set_handler)
+
+ def run(self):
+ self.timeout = 3.14159265359
+ self.start()
+ while self.process(): pass
+ self.stop()
+ self.process()
+ self.global_handler = None
+ self.handler = None
+
+ def wakeup(self):
+ n = pn_reactor_wakeup(self._impl)
+ if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
+
+ def start(self):
+ pn_reactor_start(self._impl)
+
+ @property
+ def quiesced(self):
+ return pn_reactor_quiesced(self._impl)
+
+ def _check_errors(self):
+ if self.errors:
+ for exc, value, tb in self.errors[:-1]:
+ traceback.print_exception(exc, value, tb)
+ exc, value, tb = self.errors[-1]
+ _compat.raise_(exc, value, tb)
+
+ def process(self):
+ result = pn_reactor_process(self._impl)
+ self._check_errors()
+ return result
+
+ def stop(self):
+ pn_reactor_stop(self._impl)
+ self._check_errors()
+
+ def schedule(self, delay, task):
+ impl = _chandler(task, self.on_error_delegate())
+ task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
+ pn_decref(impl)
+ return task
+
+ def acceptor(self, host, port, handler=None):
+ impl = _chandler(handler, self.on_error_delegate())
+ aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
+ pn_decref(impl)
+ if aimpl:
+ return Acceptor(aimpl)
+ else:
+ raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
+
+ def connection(self, handler=None):
+ """Deprecated: use connection_to_host() instead
+ """
+ impl = _chandler(handler, self.on_error_delegate())
+ result = Connection.wrap(pn_reactor_connection(self._impl, impl))
+ if impl: pn_decref(impl)
+ return result
+
+ def connection_to_host(self, host, port, handler=None):
+ """Create an outgoing Connection that will be managed by the reactor.
+ The reactor's pn_iohandler will create a socket connection to the host
+ once the connection is opened.
+ """
+ conn = self.connection(handler)
+ self.set_connection_host(conn, host, port)
+ return conn
+
+ def set_connection_host(self, connection, host, port):
+ """Change the address used by the connection. The address is
+ used by the reactor's iohandler to create an outgoing socket
+ connection. This must be set prior to opening the connection.
+ """
+ pn_reactor_set_connection_host(self._impl,
+ connection._impl,
+ unicode2utf8(str(host)),
+ unicode2utf8(str(port)))
+
+ def get_connection_address(self, connection):
+ """This may be used to retrieve the remote peer address.
+ @return: string containing the address in URL format or None if no
+ address is available. Use the proton.Url class to create a Url object
+ from the returned value.
+ """
+ _url = pn_reactor_get_connection_address(self._impl, connection._impl)
+ return utf82unicode(_url)
+
+ def selectable(self, handler=None):
+ impl = _chandler(handler, self.on_error_delegate())
+ result = Selectable.wrap(pn_reactor_selectable(self._impl))
+ if impl:
+ record = pn_selectable_attachments(result._impl)
+ pn_record_set_handler(record, impl)
+ pn_decref(impl)
+ return result
+
+ def update(self, sel):
+ pn_reactor_update(self._impl, sel._impl)
+
+ def push_event(self, obj, etype):
+ pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
+
+
+from ._events import wrappers as _wrappers
+
+_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
+_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
+
+
+class EventInjector(object):
+ """
+ Can be added to a reactor to allow events to be triggered by an
+ external thread but handled on the event thread associated with
+ the reactor. An instance of this class can be passed to the
+ Reactor.selectable() method of the reactor in order to activate
+ it. The close() method should be called when it is no longer
+ needed, to allow the event loop to end if needed.
+ """
+
+ def __init__(self):
+ self.queue = queue.Queue()
+ self.pipe = os.pipe()
+ self._closed = False
+
+ def trigger(self, event):
+ """
+ Request that the given event be dispatched on the event thread
+ of the reactor to which this EventInjector was added.
+ """
+ self.queue.put(event)
+ os.write(self.pipe[1], b"!")
+
+ def close(self):
+ """
+ Request that this EventInjector be closed. Existing events
+ will be dispatched on the reactors event dispatch thread,
+ then this will be removed from the set of interest.
+ """
+ self._closed = True
+ os.write(self.pipe[1], b"!")
+
+ def fileno(self):
+ return self.pipe[0]
+
+ def on_selectable_init(self, event):
+ sel = event.context
+ sel.fileno(self.fileno())
+ sel.reading = True
+ event.reactor.update(sel)
+
+ def on_selectable_readable(self, event):
+ os.read(self.pipe[0], 512)
+ while not self.queue.empty():
+ requested = self.queue.get()
+ event.reactor.push_event(requested.context, requested.type)
+ if self._closed:
+ s = event.context
+ s.terminate()
+ event.reactor.update(s)
+
+
+class ApplicationEvent(EventBase):
+ """
+ Application defined event, which can optionally be associated with
+ an engine object and or an arbitrary subject
+ """
+
+ def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+ super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
+ self.connection = connection
+ self.session = session
+ self.link = link
+ self.delivery = delivery
+ if self.delivery:
+ self.link = self.delivery.link
+ if self.link:
+ self.session = self.link.session
+ if self.session:
+ self.connection = self.session.connection
+ self.subject = subject
+
+ def __repr__(self):
+ objects = [self.connection, self.session, self.link, self.delivery, self.subject]
+ return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
+
+
+class Transaction(object):
+ """
+ Class to track state of an AMQP 1.0 transaction.
+ """
+
+ def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
+ self.txn_ctrl = txn_ctrl
+ self.handler = handler
+ self.id = None
+ self._declare = None
+ self._discharge = None
+ self.failed = False
+ self._pending = []
+ self.settle_before_discharge = settle_before_discharge
+ self.declare()
+
+ def commit(self):
+ self.discharge(False)
+
+ def abort(self):
+ self.discharge(True)
+
+ def declare(self):
+ self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
+
+ def discharge(self, failed):
+ self.failed = failed
+ self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
+
+ def _send_ctrl(self, descriptor, value):
+ delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
+ delivery.transaction = self
+ return delivery
+
+ def send(self, sender, msg, tag=None):
+ dlv = sender.send(msg, tag=tag)
+ dlv.local.data = [self.id]
+ dlv.update(0x34)
+ return dlv
+
+ def accept(self, delivery):
+ self.update(delivery, PN_ACCEPTED)
+ if self.settle_before_discharge:
+ delivery.settle()
+ else:
+ self._pending.append(delivery)
+
+ def update(self, delivery, state=None):
+ if state:
+ delivery.local.data = [self.id, Described(ulong(state), [])]
+ delivery.update(0x34)
+
+ def _release_pending(self):
+ for d in self._pending:
+ d.update(Delivery.RELEASED)
+ d.settle()
+ self._clear_pending()
+
+ def _clear_pending(self):
+ self._pending = []
+
+ def handle_outcome(self, event):
+ if event.delivery == self._declare:
+ if event.delivery.remote.data:
+ self.id = event.delivery.remote.data[0]
+ self.handler.on_transaction_declared(event)
+ elif event.delivery.remote_state == Delivery.REJECTED:
+ self.handler.on_transaction_declare_failed(event)
+ else:
+ log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
+ self.handler.on_transaction_declare_failed(event)
+ elif event.delivery == self._discharge:
+ if event.delivery.remote_state == Delivery.REJECTED:
+ if not self.failed:
+ self.handler.on_transaction_commit_failed(event)
+ self._release_pending() # make this optional?
+ else:
+ if self.failed:
+ self.handler.on_transaction_aborted(event)
+ self._release_pending()
+ else:
+ self.handler.on_transaction_committed(event)
+ self._clear_pending()
+
+
+class LinkOption(object):
+ """
+ Abstract interface for link configuration options
+ """
+
+ def apply(self, link):
+ """
+ Subclasses will implement any configuration logic in this
+ method
+ """
+ pass
+
+ def test(self, link):
+ """
+ Subclasses can override this to selectively apply an option
+ e.g. based on some link criteria
+ """
+ return True
+
+
+class AtMostOnce(LinkOption):
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_SETTLED
+
+
+class AtLeastOnce(LinkOption):
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_UNSETTLED
+ link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class SenderOption(LinkOption):
+ def apply(self, sender): pass
+
+ def test(self, link): return link.is_sender
+
+
+class ReceiverOption(LinkOption):
+ def apply(self, receiver): pass
+
+ def test(self, link): return link.is_receiver
+
+
+class DynamicNodeProperties(LinkOption):
+ def __init__(self, props={}):
+ self.properties = {}
+ for k in props:
+ if isinstance(k, symbol):
+ self.properties[k] = props[k]
+ else:
+ self.properties[symbol(k)] = props[k]
+
+ def apply(self, link):
+ if link.is_receiver:
+ link.source.properties.put_dict(self.properties)
+ else:
+ link.target.properties.put_dict(self.properties)
+
+
+class Filter(ReceiverOption):
+ def __init__(self, filter_set={}):
+ self.filter_set = filter_set
+
+ def apply(self, receiver):
+ receiver.source.filter.put_dict(self.filter_set)
+
+
+class Selector(Filter):
+ """
+ Configures a link with a message selector filter
+ """
+
+ def __init__(self, value, name='selector'):
+ super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
+
+
+class DurableSubscription(ReceiverOption):
+ def apply(self, receiver):
+ receiver.source.durability = Terminus.DELIVERIES
+ receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
+
+
+class Move(ReceiverOption):
+ def apply(self, receiver):
+ receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
+
+
+class Copy(ReceiverOption):
+ def apply(self, receiver):
+ receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
+
+
+def _apply_link_options(options, link):
+ if options:
+ if isinstance(options, list):
+ for o in options:
+ if o.test(link): o.apply(link)
+ else:
+ if options.test(link): options.apply(link)
+
+
+def _create_session(connection, handler=None):
+ session = connection.session()
+ session.open()
+ return session
+
+
+def _get_attr(target, name):
+ if hasattr(target, name):
+ return getattr(target, name)
+ else:
+ return None
+
+
+class SessionPerConnection(object):
+ def __init__(self):
+ self._default_session = None
+
+ def session(self, connection):
+ if not self._default_session:
+ self._default_session = _create_session(connection)
+ return self._default_session
+
+
+class GlobalOverrides(object):
+ """
+ Internal handler that triggers the necessary socket connect for an
+ opened connection.
+ """
+
+ def __init__(self, base):
+ self.base = base
+
+ def on_unhandled(self, name, event):
+ if not self._override(event):
+ event.dispatch(self.base)
+
+ def _override(self, event):
+ conn = event.connection
+ return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
+
+
+class Connector(Handler):
+ """
+ Internal handler that triggers the necessary socket connect for an
+ opened connection.
+ """
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.address = None
+ self.heartbeat = None
+ self.reconnect = None
+ self.ssl_domain = None
+ self.allow_insecure_mechs = True
+ self.allowed_mechs = None
+ self.sasl_enabled = True
+ self.user = None
+ self.password = None
+ self.virtual_host = None
+ self.ssl_sni = None
+ self.max_frame_size = None
+
+ def _connect(self, connection, reactor):
+ assert (reactor is not None)
+ url = self.address.next()
+ reactor.set_connection_host(connection, url.host, str(url.port))
+ # if virtual-host not set, use host from address as default
+ if self.virtual_host is None:
+ connection.hostname = url.host
+ log.debug("connecting to %r..." % url)
+
+ transport = Transport()
+ if self.sasl_enabled:
+ sasl = transport.sasl()
+ sasl.allow_insecure_mechs = self.allow_insecure_mechs
+ if url.username:
+ connection.user = url.username
+ elif self.user:
+ connection.user = self.user
+ if url.password:
+ connection.password = url.password
+ elif self.password:
+ connection.password = self.password
+ if self.allowed_mechs:
+ sasl.allowed_mechs(self.allowed_mechs)
+ transport.bind(connection)
+ if self.heartbeat:
+ transport.idle_timeout = self.heartbeat
+ if url.scheme == 'amqps':
+ if not self.ssl_domain:
+ raise SSLUnavailable("amqps: SSL libraries not found")
+ self.ssl = SSL(transport, self.ssl_domain)
+ self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
+ if self.max_frame_size:
+ transport.max_frame_size = self.max_frame_size
+
+ def on_connection_local_open(self, event):
+ self._connect(event.connection, event.reactor)
+
+ def on_connection_remote_open(self, event):
+ log.debug("connected to %s" % event.connection.hostname)
+ if self.reconnect:
+ self.reconnect.reset()
+ self.transport = None
+
+ def on_transport_tail_closed(self, event):
+ self.on_transport_closed(event)
+
+ def on_transport_closed(self, event):
+ if self.connection is None: return
+ if self.connection.state & Endpoint.LOCAL_ACTIVE:
+ if self.reconnect:
+ event.transport.unbind()
+ delay = self.reconnect.next()
+ if delay == 0:
+ log.info("Disconnected, reconnecting...")
+ self._connect(self.connection, event.reactor)
+ return
+ else:
+ log.info("Disconnected will try to reconnect after %s seconds" % delay)
+ event.reactor.schedule(delay, self)
+ return
+ else:
+ log.debug("Disconnected")
+ # See connector.cpp: conn.free()/pn_connection_release() here?
+ self.connection = None
+
+ def on_timer_task(self, event):
+ self._connect(self.connection, event.reactor)
+
+
+class Backoff(object):
+ """
+ A reconnect strategy involving an increasing delay between
+ retries, up to a maximum or 10 seconds.
+ """
+
+ def __init__(self):
+ self.delay = 0
+
+ def reset(self):
+ self.delay = 0
+
+ def next(self):
+ current = self.delay
+ if current == 0:
+ self.delay = 0.1
+ else:
+ self.delay = min(10, 2 * current)
+ return current
+
+
+class Urls(object):
+ def __init__(self, values):
+ self.values = [Url(v) for v in values]
+ self.i = iter(self.values)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ return next(self.i)
+ except StopIteration:
+ self.i = iter(self.values)
+ return next(self.i)
+
+
+class SSLConfig(object):
+ def __init__(self):
+ self.client = SSLDomain(SSLDomain.MODE_CLIENT)
+ self.server = SSLDomain(SSLDomain.MODE_SERVER)
+
+ def set_credentials(self, cert_file, key_file, password):
+ self.client.set_credentials(cert_file, key_file, password)
+ self.server.set_credentials(cert_file, key_file, password)
+
+ def set_trusted_ca_db(self, certificate_db):
+ self.client.set_trusted_ca_db(certificate_db)
+ self.server.set_trusted_ca_db(certificate_db)
+
+
+class Container(Reactor):
+ """A representation of the AMQP concept of a 'container', which
+ loosely speaking is something that establishes links to or from
+ another container, over which messages are transfered. This is
+ an extension to the Reactor class that adds convenience methods
+ for creating connections and sender- or receiver- links.
+ """
+
+ def __init__(self, *handlers, **kwargs):
+ super(Container, self).__init__(*handlers, **kwargs)
+ if "impl" not in kwargs:
+ try:
+ self.ssl = SSLConfig()
+ except SSLUnavailable:
+ self.ssl = None
+ self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
+ self.trigger = None
+ self.container_id = str(generate_uuid())
+ self.allow_insecure_mechs = True
+ self.allowed_mechs = None
+ self.sasl_enabled = True
+ self.user = None
+ self.password = None
+ Wrapper.__setattr__(self, 'subclass', self.__class__)
+
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
+ **kwargs):
+ """
+ Initiates the establishment of an AMQP connection. Returns an
+ instance of proton.Connection.
+
+ @param url: URL string of process to connect to
+
+ @param urls: list of URL strings of process to try to connect to
+
+ Only one of url or urls should be specified.
+
+ @param reconnect: Reconnect is enabled by default. You can
+ pass in an instance of Backoff to control reconnect behavior.
+ A value of False will prevent the library from automatically
+ trying to reconnect if the underlying socket is disconnected
+ before the connection has been closed.
+
+ @param heartbeat: A value in milliseconds indicating the
+ desired frequency of heartbeats used to test the underlying
+ socket is alive.
+
+ @param ssl_domain: SSL configuration in the form of an
+ instance of proton.SSLDomain.
+
+ @param handler: a connection scoped handler that will be
+ called to process any events in the scope of this connection
+ or its child links
+
+ @param kwargs: 'sasl_enabled', which determines whether a sasl
+ layer is used for the connection; 'allowed_mechs', an optional
+ string containing a space-separated list of SASL mechanisms to
+ allow if sasl is enabled; 'allow_insecure_mechs', a flag
+ indicating whether insecure mechanisms, such as PLAIN over a
+ non-encrypted socket, are allowed; 'virtual_host', the
+ hostname to set in the Open performative used by peer to
+ determine the correct back-end service for the client. If
+ 'virtual_host' is not supplied the host field from the URL is
+ used instead; 'user', the user to authenticate; 'password',
+ the authentication secret.
+
+ """
+ conn = self.connection(handler)
+ conn.container = self.container_id or str(generate_uuid())
+ conn.offered_capabilities = kwargs.get('offered_capabilities')
+ conn.desired_capabilities = kwargs.get('desired_capabilities')
+ conn.properties = kwargs.get('properties')
+
+ connector = Connector(conn)
+ connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
+ connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
+ connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
+ connector.user = kwargs.get('user', self.user)
+ connector.password = kwargs.get('password', self.password)
+ connector.virtual_host = kwargs.get('virtual_host')
+ if connector.virtual_host:
+ # only set hostname if virtual-host is a non-empty string
+ conn.hostname = connector.virtual_host
+ connector.ssl_sni = kwargs.get('sni')
+ connector.max_frame_size = kwargs.get('max_frame_size')
+
+ conn._overrides = connector
+ if url:
+ connector.address = Urls([url])
+ elif urls:
+ connector.address = Urls(urls)
+ elif address:
+ connector.address = address
+ else:
+ raise ValueError("One of url, urls or address required")
+ if heartbeat:
+ connector.heartbeat = heartbeat
+ if reconnect:
+ connector.reconnect = reconnect
+ elif reconnect is None:
+ connector.reconnect = Backoff()
+ # use container's default client domain if none specified. This is
+ # only necessary of the URL specifies the "amqps:" scheme
+ connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
+ conn._session_policy = SessionPerConnection() # todo: make configurable
+ conn.open()
+ return conn
+
+ def _get_id(self, container, remote, local):
+ if local and remote:
+ "%s-%s-%s" % (container, remote, local)
+ elif local:
+ return "%s-%s" % (container, local)
+ elif remote:
+ return "%s-%s" % (container, remote)
+ else:
+ return "%s-%s" % (container, str(generate_uuid()))
+
+ def _get_session(self, context):
+ if isinstance(context, Url):
+ return self._get_session(self.connect(url=context))
+ elif isinstance(context, Session):
+ return context
+ elif isinstance(context, Connection):
+ if hasattr(context, '_session_policy'):
+ return context._session_policy.session(context)
+ else:
+ return _create_session(context)
+ else:
+ return context.session()
+
+ def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
+ """
+ Initiates the establishment of a link over which messages can
+ be sent. Returns an instance of proton.Sender.
+
+ There are two patterns of use. (1) A connection can be passed
+ as the first argument, in which case the link is established
+ on that connection. In this case the target address can be
+ specified as the second argument (or as a keyword
+ argument). The source address can also be specified if
+ desired. (2) Alternatively a URL can be passed as the first
+ argument. In this case a new connection will be established on
+ which the link will be attached. If a path is specified and
+ the target is not, then the path of the URL is used as the
+ target address.
+
+ The name of the link may be specified if desired, otherwise a
+ unique name will be generated.
+
+ Various LinkOptions can be specified to further control the
+ attachment.
+ """
+ if isstring(context):
+ context = Url(context)
+ if isinstance(context, Url) and not target:
+ target = context.path
+ session = self._get_session(context)
+ snd = session.sender(name or self._get_id(session.connection.container, target, source))
+ if source:
+ snd.source.address = source
+ if target:
+ snd.target.address = target
+ if handler != None:
+ snd.handler = handler
+ if tags:
+ snd.tag_generator = tags
+ _apply_link_options(options, snd)
+ snd.open()
+ return snd
+
+ def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
+ """
+ Initiates the establishment of a link over which messages can
+ be received (aka a subscription). Returns an instance of
+ proton.Receiver.
+
+ There are two patterns of use. (1) A connection can be passed
+ as the first argument, in which case the link is established
+ on that connection. In this case the source address can be
+ specified as the second argument (or as a keyword
+ argument). The target address can also be specified if
+ desired. (2) Alternatively a URL can be passed as the first
+ argument. In this case a new connection will be established on
+ which the link will be attached. If a path is specified and
+ the source is not, then the path of the URL is used as the
+ target address.
+
+ The name of the link may be specified if desired, otherwise a
+ unique name will be generated.
+
+ Various LinkOptions can be specified to further control the
+ attachment.
+ """
+ if isstring(context):
+ context = Url(context)
+ if isinstance(context, Url) and not source:
+ source = context.path
+ session = self._get_session(context)
+ rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
+ if source:
+ rcv.source.address = source
+ if dynamic:
+ rcv.source.dynamic = True
+ if target:
+ rcv.target.address = target
+ if handler != None:
+ rcv.handler = handler
+ _apply_link_options(options, rcv)
+ rcv.open()
+ return rcv
+
+ def declare_transaction(self, context, handler=None, settle_before_discharge=False):
+ if not _get_attr(context, '_txn_ctrl'):
+ class InternalTransactionHandler(OutgoingMessageHandler):
+ def __init__(self):
+ super(InternalTransactionHandler, self).__init__(auto_settle=True)
+
+ def on_settled(self, event):
+ if hasattr(event.delivery, "transaction"):
+ event.transaction = event.delivery.transaction
+ event.delivery.transaction.handle_outcome(event)
+
+ def on_unhandled(self, method, event):
+ if handler:
+ event.dispatch(handler)
+
+ context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
+ context._txn_ctrl.target.type = Terminus.COORDINATOR
+ context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+ return Transaction(context._txn_ctrl, handler, settle_before_discharge)
+
+ def listen(self, url, ssl_domain=None):
+ """
+ Initiates a server socket, accepting incoming AMQP connections
+ on the interface and port specified.
+ """
+ url = Url(url)
+ acceptor = self.acceptor(url.host, url.port)
+ ssl_config = ssl_domain
+ if not ssl_config and url.scheme == 'amqps':
+ # use container's default server domain
+ if self.ssl:
+ ssl_config = self.ssl.server
+ else:
+ raise SSLUnavailable("amqps: SSL libraries not found")
+ if ssl_config:
+ acceptor.set_ssl_domain(ssl_config)
+ return acceptor
+
+ def do_work(self, timeout=None):
+ if timeout:
+ self.timeout = timeout
+ return self.process()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/_utils.py
----------------------------------------------------------------------
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
new file mode 100644
index 0000000..6462b55
--- /dev/null
+++ b/python/proton/_utils.py
@@ -0,0 +1,421 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import collections
+import time
+import threading
+
+from cproton import pn_reactor_collector, pn_collector_release
+
+from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
+from ._delivery import Delivery
+from ._endpoints import Endpoint, Link
+from ._events import Handler
+from ._url import Url
+
+from ._reactor import Container
+from ._handlers import MessagingHandler, IncomingMessageHandler
+
+
+class BlockingLink(object):
+ def __init__(self, connection, link):
+ self.connection = connection
+ self.link = link
+ self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+ msg="Opening link %s" % link.name)
+ self._checkClosed()
+
+ def _waitForClose(self, timeout=1):
+ try:
+ self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
+ timeout=timeout,
+ msg="Opening link %s" % self.link.name)
+ except Timeout as e:
+ pass
+ self._checkClosed()
+
+ def _checkClosed(self):
+ if self.link.state & Endpoint.REMOTE_CLOSED:
+ self.link.close()
+ if not self.connection.closing:
+ raise LinkDetached(self.link)
+
+ def close(self):
+ self.link.close()
+ self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
+ msg="Closing link %s" % self.link.name)
+
+ # Access to other link attributes.
+ def __getattr__(self, name):
+ return getattr(self.link, name)
+
+
+class SendException(ProtonException):
+ """
+ Exception used to indicate an exceptional state/condition on a send request
+ """
+
+ def __init__(self, state):
+ self.state = state
+
+
+def _is_settled(delivery):
+ return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
+
+
+class BlockingSender(BlockingLink):
+ def __init__(self, connection, sender):
+ super(BlockingSender, self).__init__(connection, sender)
+ if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
+ # this may be followed by a detach, which may contain an error condition, so wait a little...
+ self._waitForClose()
+ # ...but close ourselves if peer does not
+ self.link.close()
+ raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
+
+ def send(self, msg, timeout=False, error_states=None):
+ delivery = self.link.send(msg)
+ self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
+ timeout=timeout)
+ if delivery.link.snd_settle_mode != Link.SND_SETTLED:
+ delivery.settle()
+ bad = error_states
+ if bad is None:
+ bad = [Delivery.REJECTED, Delivery.RELEASED]
+ if delivery.remote_state in bad:
+ raise SendException(delivery.remote_state)
+ return delivery
+
+
+class Fetcher(MessagingHandler):
+ def __init__(self, connection, prefetch):
+ super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
+ self.connection = connection
+ self.incoming = collections.deque([])
+ self.unsettled = collections.deque([])
+
+ def on_message(self, event):
+ self.incoming.append((event.message, event.delivery))
+ self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+
+ def on_link_error(self, event):
+ if event.link.state & Endpoint.LOCAL_ACTIVE:
+ event.link.close()
+ if not self.connection.closing:
+ raise LinkDetached(event.link)
+
+ def on_connection_error(self, event):
+ if not self.connection.closing:
+ raise ConnectionClosed(event.connection)
+
+ @property
+ def has_message(self):
+ return len(self.incoming)
+
+ def pop(self):
+ message, delivery = self.incoming.popleft()
+ if not delivery.settled:
+ self.unsettled.append(delivery)
+ return message
+
+ def settle(self, state=None):
+ delivery = self.unsettled.popleft()
+ if state:
+ delivery.update(state)
+ delivery.settle()
+
+
+class BlockingReceiver(BlockingLink):
+ def __init__(self, connection, receiver, fetcher, credit=1):
+ super(BlockingReceiver, self).__init__(connection, receiver)
+ if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
+ # this may be followed by a detach, which may contain an error condition, so wait a little...
+ self._waitForClose()
+ # ...but close ourselves if peer does not
+ self.link.close()
+ raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
+ if credit: receiver.flow(credit)
+ self.fetcher = fetcher
+ self.container = connection.container
+
+ def __del__(self):
+ self.fetcher = None
+ # The next line causes a core dump if the Proton-C reactor finalizes
+ # first. The self.container reference prevents out of order reactor
+ # finalization. It may not be set if exception in BlockingLink.__init__
+ if hasattr(self, "container"):
+ self.link.handler = None # implicit call to reactor
+
+ def receive(self, timeout=False):
+ if not self.fetcher:
+ raise Exception("Can't call receive on this receiver as a handler was provided")
+ if not self.link.credit:
+ self.link.flow(1)
+ self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
+ timeout=timeout)
+ return self.fetcher.pop()
+
+ def accept(self):
+ self.settle(Delivery.ACCEPTED)
+
+ def reject(self):
+ self.settle(Delivery.REJECTED)
+
+ def release(self, delivered=True):
+ if delivered:
+ self.settle(Delivery.MODIFIED)
+ else:
+ self.settle(Delivery.RELEASED)
+
+ def settle(self, state=None):
+ if not self.fetcher:
+ raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
+ self.fetcher.settle(state)
+
+
+class LinkDetached(LinkException):
+ def __init__(self, link):
+ self.link = link
+ if link.is_sender:
+ txt = "sender %s to %s closed" % (link.name, link.target.address)
+ else:
+ txt = "receiver %s from %s closed" % (link.name, link.source.address)
+ if link.remote_condition:
+ txt += " due to: %s" % link.remote_condition
+ self.condition = link.remote_condition.name
+ else:
+ txt += " by peer"
+ self.condition = None
+ super(LinkDetached, self).__init__(txt)
+
+
+class ConnectionClosed(ConnectionException):
+ def __init__(self, connection):
+ self.connection = connection
+ txt = "Connection %s closed" % connection.hostname
+ if connection.remote_condition:
+ txt += " due to: %s" % connection.remote_condition
+ self.condition = connection.remote_condition.name
+ else:
+ txt += " by peer"
+ self.condition = None
+ super(ConnectionClosed, self).__init__(txt)
+
+
+class BlockingConnection(Handler):
+ """
+ A synchronous style connection wrapper.
+
+ This object's implementation uses OS resources. To ensure they
+ are released when the object is no longer in use, make sure that
+ object operations are enclosed in a try block and that close() is
+ always executed on exit.
+ """
+
+ def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
+ self.disconnected = False
+ self.timeout = timeout or 60
+ self.container = container or Container()
+ self.container.timeout = self.timeout
+ self.container.start()
+ self.url = Url(url).defaults()
+ self.conn = None
+ self.closing = False
+ failed = True
+ try:
+ self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
+ heartbeat=heartbeat, **kwargs)
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+ msg="Opening connection")
+ failed = False
+ finally:
+ if failed and self.conn:
+ self.close()
+
+ def create_sender(self, address, handler=None, name=None, options=None):
+ return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
+ options=options))
+
+ def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
+ prefetch = credit
+ if handler:
+ fetcher = None
+ if prefetch is None:
+ prefetch = 1
+ else:
+ fetcher = Fetcher(self, credit)
+ return BlockingReceiver(
+ self,
+ self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
+ options=options), fetcher, credit=prefetch)
+
+ def close(self):
+ # TODO: provide stronger interrupt protection on cleanup. See PEP 419
+ if self.closing:
+ return
+ self.closing = True
+ self.container.errors = []
+ try:
+ if self.conn:
+ self.conn.close()
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+ msg="Closing connection")
+ finally:
+ self.conn.free()
+ # Nothing left to block on. Allow reactor to clean up.
+ self.run()
+ self.conn = None
+ self.container.global_handler = None # break circular ref: container to cadapter.on_error
+ pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
+ self.container = None
+
+ def _is_closed(self):
+ return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
+
+ def run(self):
+ """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+ while self.container.process(): pass
+ self.container.stop()
+ self.container.process()
+
+ def wait(self, condition, timeout=False, msg=None):
+ """Call process until condition() is true"""
+ if timeout is False:
+ timeout = self.timeout
+ if timeout is None:
+ while not condition() and not self.disconnected:
+ self.container.process()
+ else:
+ container_timeout = self.container.timeout
+ self.container.timeout = timeout
+ try:
+ deadline = time.time() + timeout
+ while not condition() and not self.disconnected:
+ self.container.process()
+ if deadline < time.time():
+ txt = "Connection %s timed out" % self.url
+ if msg: txt += ": " + msg
+ raise Timeout(txt)
+ finally:
+ self.container.timeout = container_timeout
+ if self.disconnected or self._is_closed():
+ self.container.stop()
+ self.conn.handler = None # break cyclical reference
+ if self.disconnected and not self._is_closed():
+ raise ConnectionException(
+ "Connection %s disconnected: %s" % (self.url, self.disconnected))
+
+ def on_link_remote_close(self, event):
+ if event.link.state & Endpoint.LOCAL_ACTIVE:
+ event.link.close()
+ if not self.closing:
+ raise LinkDetached(event.link)
+
+ def on_connection_remote_close(self, event):
+ if event.connection.state & Endpoint.LOCAL_ACTIVE:
+ event.connection.close()
+ if not self.closing:
+ raise ConnectionClosed(event.connection)
+
+ def on_transport_tail_closed(self, event):
+ self.on_transport_closed(event)
+
+ def on_transport_head_closed(self, event):
+ self.on_transport_closed(event)
+
+ def on_transport_closed(self, event):
+ self.disconnected = event.transport.condition or "unknown"
+
+
+class AtomicCount(object):
+ def __init__(self, start=0, step=1):
+ """Thread-safe atomic counter. Start at start, increment by step."""
+ self.count, self.step = start, step
+ self.lock = threading.Lock()
+
+ def next(self):
+ """Get the next value"""
+ self.lock.acquire()
+ self.count += self.step;
+ result = self.count
+ self.lock.release()
+ return result
+
+
+class SyncRequestResponse(IncomingMessageHandler):
+ """
+ Implementation of the synchronous request-response (aka RPC) pattern.
+ @ivar address: Address for all requests, may be None.
+ @ivar connection: Connection for requests and responses.
+ """
+
+ correlation_id = AtomicCount()
+
+ def __init__(self, connection, address=None):
+ """
+ Send requests and receive responses. A single instance can send many requests
+ to the same or different addresses.
+
+ @param connection: A L{BlockingConnection}
+ @param address: Address for all requests.
+ If not specified, each request must have the address property set.
+ Successive messages may have different addresses.
+ """
+ super(SyncRequestResponse, self).__init__()
+ self.connection = connection
+ self.address = address
+ self.sender = self.connection.create_sender(self.address)
+ # dynamic=true generates a unique address dynamically for this receiver.
+ # credit=1 because we want to receive 1 response message initially.
+ self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
+ self.response = None
+
+ def call(self, request):
+ """
+ Send a request message, wait for and return the response message.
+
+ @param request: A L{proton.Message}. If L{self.address} is not set the
+ L{self.address} must be set and will be used.
+ """
+ if not self.address and not request.address:
+ raise ValueError("Request message has no address: %s" % request)
+ request.reply_to = self.reply_to
+ request.correlation_id = correlation_id = str(self.correlation_id.next())
+ self.sender.send(request)
+
+ def wakeup():
+ return self.response and (self.response.correlation_id == correlation_id)
+
+ self.connection.wait(wakeup, msg="Waiting for response")
+ response = self.response
+ self.response = None # Ready for next response.
+ self.receiver.flow(1) # Set up credit for the next response.
+ return response
+
+ @property
+ def reply_to(self):
+ """Return the dynamic address of our receiver."""
+ return self.receiver.remote_source.address
+
+ def on_message(self, event):
+ """Called when we receive a message for our receiver."""
+ self.response = event.message
+ self.connection.container.yield_() # Wake up the wait() loop to handle the message.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org