You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/08/28 21:39:06 UTC
[6/7] qpid-proton git commit: PROTON-1922: [Python] Restrict exported
symbols from proton submodules - Restricted symbols exported by
proton.reactor, proton.handlers,
proton.utils - All symbols used by tests and examples are exported - Other
symbols that
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 8b798f8..c324812 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -19,727 +19,22 @@
from __future__ import absolute_import
-import logging
-import time
-import weakref
-from select import select
-
-from ._delivery import Delivery
-from ._endpoints import Endpoint
-from ._message import Message
-from ._exceptions import ProtonException
-from ._events import Handler, _dispatch
-
-log = logging.getLogger("proton")
-
-
-class OutgoingMessageHandler(Handler):
- """
- A utility for simpler and more intuitive handling of delivery
- events related to outgoing i.e. sent messages.
- """
-
- def __init__(self, auto_settle=True, delegate=None):
- self.auto_settle = auto_settle
- self.delegate = delegate
-
- def on_link_flow(self, event):
- if event.link.is_sender and event.link.credit \
- and event.link.state & Endpoint.LOCAL_ACTIVE \
- and event.link.state & Endpoint.REMOTE_ACTIVE:
- self.on_sendable(event)
-
- def on_delivery(self, event):
- dlv = event.delivery
- if dlv.link.is_sender and dlv.updated:
- if dlv.remote_state == Delivery.ACCEPTED:
- self.on_accepted(event)
- elif dlv.remote_state == Delivery.REJECTED:
- self.on_rejected(event)
- elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
- self.on_released(event)
- if dlv.settled:
- self.on_settled(event)
- if self.auto_settle:
- dlv.settle()
-
- def on_sendable(self, event):
- """
- Called when the sender link has credit and messages can
- therefore be transferred.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_sendable', event)
-
- def on_accepted(self, event):
- """
- Called when the remote peer accepts an outgoing message.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_accepted', event)
-
- def on_rejected(self, event):
- """
- Called when the remote peer rejects an outgoing message.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_rejected', event)
-
- def on_released(self, event):
- """
- Called when the remote peer releases an outgoing message. Note
- that this may be in response to either the RELEASE or MODIFIED
- state as defined by the AMQP specification.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_released', event)
-
- def on_settled(self, event):
- """
- Called when the remote peer has settled the outgoing
- message. This is the point at which it should never be
- retransmitted.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_settled', event)
-
-
-def recv_msg(delivery):
- msg = Message()
- msg.decode(delivery.link.recv(delivery.pending))
- delivery.link.advance()
- return msg
-
-
-class Reject(ProtonException):
- """
- An exception that indicate a message should be rejected
- """
- pass
-
-
-class Release(ProtonException):
- """
- An exception that indicate a message should be rejected
- """
- pass
-
-
-class Acking(object):
- def accept(self, delivery):
- """
- Accepts a received message.
-
- Note that this method cannot currently be used in combination
- with transactions.
- """
- self.settle(delivery, Delivery.ACCEPTED)
-
- def reject(self, delivery):
- """
- Rejects a received message that is considered invalid or
- unprocessable.
- """
- self.settle(delivery, Delivery.REJECTED)
-
- def release(self, delivery, delivered=True):
- """
- Releases a received message, making it available at the source
- for any (other) interested receiver. The ``delivered``
- parameter indicates whether this should be considered a
- delivery attempt (and the delivery count updated) or not.
- """
- if delivered:
- self.settle(delivery, Delivery.MODIFIED)
- else:
- self.settle(delivery, Delivery.RELEASED)
-
- def settle(self, delivery, state=None):
- if state:
- delivery.update(state)
- delivery.settle()
-
-
-class IncomingMessageHandler(Handler, Acking):
- """
- A utility for simpler and more intuitive handling of delivery
- events related to incoming i.e. received messages.
- """
-
- def __init__(self, auto_accept=True, delegate=None):
- self.delegate = delegate
- self.auto_accept = auto_accept
-
- def on_delivery(self, event):
- dlv = event.delivery
- if not dlv.link.is_receiver: return
- if dlv.aborted:
- self.on_aborted(event)
- dlv.settle()
- elif dlv.readable and not dlv.partial:
- event.message = recv_msg(dlv)
- if event.link.state & Endpoint.LOCAL_CLOSED:
- if self.auto_accept:
- dlv.update(Delivery.RELEASED)
- dlv.settle()
- else:
- try:
- self.on_message(event)
- if self.auto_accept:
- dlv.update(Delivery.ACCEPTED)
- dlv.settle()
- except Reject:
- dlv.update(Delivery.REJECTED)
- dlv.settle()
- except Release:
- dlv.update(Delivery.MODIFIED)
- dlv.settle()
- elif dlv.updated and dlv.settled:
- self.on_settled(event)
-
- def on_message(self, event):
- """
- Called when a message is received. The message itself can be
- obtained as a property on the event. For the purpose of
- referring to this message in further actions (e.g. if
- explicitly accepting it, the ``delivery`` should be used, also
- obtainable via a property on the event.
- """
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_message', event)
-
- def on_settled(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_settled', event)
-
- def on_aborted(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_aborted', event)
-
-
-class EndpointStateHandler(Handler):
- """
- A utility that exposes 'endpoint' events i.e. the open/close for
- links, sessions and connections in a more intuitive manner. A
- XXX_opened method will be called when both local and remote peers
- have opened the link, session or connection. This can be used to
- confirm a locally initiated action for example. A XXX_opening
- method will be called when the remote peer has requested an open
- that was not initiated locally. By default this will simply open
- locally, which then triggers the XXX_opened call. The same applies
- to close.
- """
-
- def __init__(self, peer_close_is_error=False, delegate=None):
- self.delegate = delegate
- self.peer_close_is_error = peer_close_is_error
-
- @classmethod
- def is_local_open(cls, endpoint):
- return endpoint.state & Endpoint.LOCAL_ACTIVE
-
- @classmethod
- def is_local_uninitialised(cls, endpoint):
- return endpoint.state & Endpoint.LOCAL_UNINIT
-
- @classmethod
- def is_local_closed(cls, endpoint):
- return endpoint.state & Endpoint.LOCAL_CLOSED
-
- @classmethod
- def is_remote_open(cls, endpoint):
- return endpoint.state & Endpoint.REMOTE_ACTIVE
-
- @classmethod
- def is_remote_closed(cls, endpoint):
- return endpoint.state & Endpoint.REMOTE_CLOSED
-
- @classmethod
- def print_error(cls, endpoint, endpoint_type):
- if endpoint.remote_condition:
- log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
- elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
- log.error("%s closed by peer" % endpoint_type)
-
- def on_link_remote_close(self, event):
- if event.link.remote_condition:
- self.on_link_error(event)
- elif self.is_local_closed(event.link):
- self.on_link_closed(event)
- else:
- self.on_link_closing(event)
- event.link.close()
-
- def on_session_remote_close(self, event):
- if event.session.remote_condition:
- self.on_session_error(event)
- elif self.is_local_closed(event.session):
- self.on_session_closed(event)
- else:
- self.on_session_closing(event)
- event.session.close()
-
- def on_connection_remote_close(self, event):
- if event.connection.remote_condition:
- if event.connection.remote_condition.name == "amqp:connection:forced":
- # Treat this the same as just having the transport closed by the peer without
- # sending any events. Allow reconnection to happen transparently.
- return
- self.on_connection_error(event)
- elif self.is_local_closed(event.connection):
- self.on_connection_closed(event)
- else:
- self.on_connection_closing(event)
- event.connection.close()
-
- def on_connection_local_open(self, event):
- if self.is_remote_open(event.connection):
- self.on_connection_opened(event)
-
- def on_connection_remote_open(self, event):
- if self.is_local_open(event.connection):
- self.on_connection_opened(event)
- elif self.is_local_uninitialised(event.connection):
- self.on_connection_opening(event)
- event.connection.open()
-
- def on_session_local_open(self, event):
- if self.is_remote_open(event.session):
- self.on_session_opened(event)
-
- def on_session_remote_open(self, event):
- if self.is_local_open(event.session):
- self.on_session_opened(event)
- elif self.is_local_uninitialised(event.session):
- self.on_session_opening(event)
- event.session.open()
-
- def on_link_local_open(self, event):
- if self.is_remote_open(event.link):
- self.on_link_opened(event)
-
- def on_link_remote_open(self, event):
- if self.is_local_open(event.link):
- self.on_link_opened(event)
- elif self.is_local_uninitialised(event.link):
- self.on_link_opening(event)
- event.link.open()
-
- def on_connection_opened(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_connection_opened', event)
-
- def on_session_opened(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_session_opened', event)
-
- def on_link_opened(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_link_opened', event)
-
- def on_connection_opening(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_connection_opening', event)
-
- def on_session_opening(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_session_opening', event)
-
- def on_link_opening(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_link_opening', event)
-
- def on_connection_error(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_connection_error', event)
- else:
- self.log_error(event.connection, "connection")
-
- def on_session_error(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_session_error', event)
- else:
- self.log_error(event.session, "session")
- event.connection.close()
-
- def on_link_error(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_link_error', event)
- else:
- self.log_error(event.link, "link")
- event.connection.close()
-
- def on_connection_closed(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_connection_closed', event)
-
- def on_session_closed(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_session_closed', event)
-
- def on_link_closed(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_link_closed', event)
-
- def on_connection_closing(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_connection_closing', event)
- elif self.peer_close_is_error:
- self.on_connection_error(event)
-
- def on_session_closing(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_session_closing', event)
- elif self.peer_close_is_error:
- self.on_session_error(event)
-
- def on_link_closing(self, event):
- if self.delegate is not None:
- _dispatch(self.delegate, 'on_link_closing', event)
- elif self.peer_close_is_error:
- self.on_link_error(event)
-
- def on_transport_tail_closed(self, event):
- self.on_transport_closed(event)
-
- def on_transport_closed(self, event):
- if self.delegate is not None and event.connection and self.is_local_open(event.connection):
- _dispatch(self.delegate, 'on_disconnected', event)
-
-
-class MessagingHandler(Handler, Acking):
- """
- A general purpose handler that makes the proton-c events somewhat
- simpler to deal with and/or avoids repetitive tasks for common use
- cases.
- """
-
- def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
- self.handlers = []
- if prefetch:
- self.handlers.append(FlowController(prefetch))
- self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
- self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
- self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
- self.fatal_conditions = ["amqp:unauthorized-access"]
-
- def on_transport_error(self, event):
- """
- Called when some error is encountered with the transport over
- which the AMQP connection is to be established. This includes
- authentication errors as well as socket errors.
- """
- if event.transport.condition:
- if event.transport.condition.info:
- log.error("%s: %s: %s" % (
- event.transport.condition.name, event.transport.condition.description,
- event.transport.condition.info))
- else:
- log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
- if event.transport.condition.name in self.fatal_conditions:
- event.connection.close()
- else:
- logging.error("Unspecified transport error")
-
- def on_connection_error(self, event):
- """
- Called when the peer closes the connection with an error condition.
- """
- EndpointStateHandler.print_error(event.connection, "connection")
-
- def on_session_error(self, event):
- """
- Called when the peer closes the session with an error condition.
- """
- EndpointStateHandler.print_error(event.session, "session")
- event.connection.close()
-
- def on_link_error(self, event):
- """
- Called when the peer closes the link with an error condition.
- """
- EndpointStateHandler.print_error(event.link, "link")
- event.connection.close()
-
- def on_reactor_init(self, event):
- """
- Called when the event loop - the reactor - starts.
- """
- if hasattr(event.reactor, 'subclass'):
- setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
- self.on_start(event)
-
- def on_start(self, event):
- """
- Called when the event loop starts. (Just an alias for on_reactor_init)
- """
- pass
-
- def on_connection_closed(self, event):
- """
- Called when the connection is closed.
- """
- pass
-
- def on_session_closed(self, event):
- """
- Called when the session is closed.
- """
- pass
-
- def on_link_closed(self, event):
- """
- Called when the link is closed.
- """
- pass
-
- def on_connection_closing(self, event):
- """
- Called when the peer initiates the closing of the connection.
- """
- pass
-
- def on_session_closing(self, event):
- """
- Called when the peer initiates the closing of the session.
- """
- pass
-
- def on_link_closing(self, event):
- """
- Called when the peer initiates the closing of the link.
- """
- pass
-
- def on_disconnected(self, event):
- """
- Called when the socket is disconnected.
- """
- pass
-
- def on_sendable(self, event):
- """
- Called when the sender link has credit and messages can
- therefore be transferred.
- """
- pass
-
- def on_accepted(self, event):
- """
- Called when the remote peer accepts an outgoing message.
- """
- pass
-
- def on_rejected(self, event):
- """
- Called when the remote peer rejects an outgoing message.
- """
- pass
-
- def on_released(self, event):
- """
- Called when the remote peer releases an outgoing message. Note
- that this may be in response to either the RELEASE or MODIFIED
- state as defined by the AMQP specification.
- """
- pass
-
- def on_settled(self, event):
- """
- Called when the remote peer has settled the outgoing
- message. This is the point at which it should never be
- retransmitted.
- """
- pass
-
- def on_message(self, event):
- """
- Called when a message is received. The message itself can be
- obtained as a property on the event. For the purpose of
- referring to this message in further actions (e.g. if
- explicitly accepting it, the ``delivery`` should be used, also
- obtainable via a property on the event.
- """
- pass
-
-
-class TransactionHandler(object):
- """
- The interface for transaction handlers, i.e. objects that want to
- be notified of state changes related to a transaction.
- """
-
- def on_transaction_declared(self, event):
- pass
-
- def on_transaction_committed(self, event):
- pass
-
- def on_transaction_aborted(self, event):
- pass
-
- def on_transaction_declare_failed(self, event):
- pass
-
- def on_transaction_commit_failed(self, event):
- pass
-
-
-class TransactionalClientHandler(MessagingHandler, TransactionHandler):
- """
- An extension to the MessagingHandler for applications using
- transactions.
- """
-
- def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
- super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
-
- def accept(self, delivery, transaction=None):
- if transaction:
- transaction.accept(delivery)
- else:
- super(TransactionalClientHandler, self).accept(delivery)
-
-
-class FlowController(Handler):
- def __init__(self, window=1024):
- self._window = window
- self._drained = 0
-
- def on_link_local_open(self, event):
- self._flow(event.link)
-
- def on_link_remote_open(self, event):
- self._flow(event.link)
-
- def on_link_flow(self, event):
- self._flow(event.link)
-
- def on_delivery(self, event):
- self._flow(event.link)
-
- def _flow(self, link):
- if link.is_receiver:
- self._drained += link.drained()
- if self._drained == 0:
- delta = self._window - link.credit
- link.flow(delta)
-
-
-class Handshaker(Handler):
-
- @staticmethod
- def on_connection_remote_open(event):
- conn = event.connection
- if conn.state & Endpoint.LOCAL_UNINIT:
- conn.open()
-
- @staticmethod
- def on_session_remote_open(event):
- ssn = event.session
- if ssn.state() & Endpoint.LOCAL_UNINIT:
- ssn.open()
-
- @staticmethod
- def on_link_remote_open(event):
- link = event.link
- if link.state & Endpoint.LOCAL_UNINIT:
- link.source.copy(link.remote_source)
- link.target.copy(link.remote_target)
- link.open()
-
- @staticmethod
- def on_connection_remote_close(event):
- conn = event.connection
- if not conn.state & Endpoint.LOCAL_CLOSED:
- conn.close()
-
- @staticmethod
- def on_session_remote_close(event):
- ssn = event.session
- if not ssn.state & Endpoint.LOCAL_CLOSED:
- ssn.close()
-
- @staticmethod
- def on_link_remote_close(event):
- link = event.link
- if not link.state & Endpoint.LOCAL_CLOSED:
- link.close()
-
-
-# Back compatibility definitions
-CFlowController = FlowController
-CHandshaker = Handshaker
-
-
-from ._events import WrappedHandler
-from cproton import pn_iohandler
-
-class IOHandler(WrappedHandler):
-
- def __init__(self):
- WrappedHandler.__init__(self, pn_iohandler)
-
-
-class PythonIO:
-
- def __init__(self):
- self.selectables = []
- self.delegate = IOHandler()
-
- def on_unhandled(self, method, event):
- event.dispatch(self.delegate)
-
- def on_selectable_init(self, event):
- self.selectables.append(event.context)
-
- def on_selectable_updated(self, event):
- pass
-
- def on_selectable_final(self, event):
- sel = event.context
- if sel.is_terminal:
- self.selectables.remove(sel)
- sel.release()
-
- def on_reactor_quiesced(self, event):
- reactor = event.reactor
- # check if we are still quiesced, other handlers of
- # on_reactor_quiesced could have produced events to process
- if not reactor.quiesced: return
-
- reading = []
- writing = []
- deadline = None
- for sel in self.selectables:
- if sel.reading:
- reading.append(sel)
- if sel.writing:
- writing.append(sel)
- if sel.deadline:
- if deadline is None:
- deadline = sel.deadline
- else:
- deadline = min(sel.deadline, deadline)
-
- if deadline is not None:
- timeout = deadline - time.time()
- else:
- timeout = reactor.timeout
- if (timeout < 0): timeout = 0
- timeout = min(timeout, reactor.timeout)
- readable, writable, _ = select(reading, writing, [], timeout)
-
- reactor.mark()
-
- now = time.time()
-
- for s in readable:
- s.readable()
- for s in writable:
- s.writable()
- for s in self.selectables:
- if s.deadline and now > s.deadline:
- s.expired()
-
- reactor.yield_()
+from ._handlers import MessagingHandler, IncomingMessageHandler, OutgoingMessageHandler, \
+ EndpointStateHandler, TransactionHandler, TransactionalClientHandler,\
+ Reject, Release,\
+ Handshaker, FlowController, IOHandler, PythonIO
+
+__all__ = [
+ 'MessagingHandler',
+ 'IncomingMessageHandler',
+ 'OutgoingMessageHandler',
+ 'EndpointStateHandler',
+ 'TransactionHandler',
+ 'TransactionalClientHandler',
+ 'Reject',
+ 'Release',
+ 'Handshaker',
+ 'FlowController',
+ 'IOHandler',
+ 'PythonIO'
+]
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/reactor.py b/python/proton/reactor.py
index ab9c7db..998b86a 100644
--- a/python/proton/reactor.py
+++ b/python/proton/reactor.py
@@ -19,964 +19,22 @@
from __future__ import absolute_import
-import os
-import logging
-import traceback
-import uuid
-
-from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
- pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
- pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
- pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
- pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
- pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
- pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
- pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
- pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
-
-from ._delivery import Delivery
-from ._endpoints import Connection, Endpoint, Link, Session, Terminus
-from ._data import Described, symbol, ulong
-from ._message import Message
-from ._transport import Transport, SSL, SSLDomain, SSLUnavailable
-from ._url import Url
-from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
-from ._events import EventType, EventBase, Handler
-from ._reactor_impl import Selectable, WrappedHandler, _chandler
-from ._wrapper import Wrapper, PYCTX
-
-from .handlers import OutgoingMessageHandler
-
-from . import _compat
-from ._compat import queue
-
-log = logging.getLogger("proton")
-
-
-def generate_uuid():
- return uuid.uuid4()
-
-
-def _timeout2millis(secs):
- if secs is None: return PN_MILLIS_MAX
- return secs2millis(secs)
-
-
-def _millis2timeout(millis):
- if millis == PN_MILLIS_MAX: return None
- return millis2secs(millis)
-
-
-class Task(Wrapper):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Task(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_task_attachments)
-
- def _init(self):
- pass
-
- def cancel(self):
- pn_task_cancel(self._impl)
-
-
-class Acceptor(Wrapper):
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl)
-
- def set_ssl_domain(self, ssl_domain):
- pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
-
- def close(self):
- pn_acceptor_close(self._impl)
-
-
-class Reactor(Wrapper):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- record = pn_reactor_attachments(impl)
- attrs = pn_void2py(pn_record_get(record, PYCTX))
- if attrs and 'subclass' in attrs:
- return attrs['subclass'](impl=impl)
- else:
- return Reactor(impl=impl)
-
- def __init__(self, *handlers, **kwargs):
- Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
- for h in handlers:
- self.handler.add(h, on_error=self.on_error_delegate())
-
- def _init(self):
- self.errors = []
-
- # on_error relay handler tied to underlying C reactor. Use when the
- # error will always be generated from a callback from this reactor.
- # Needed to prevent reference cycles and be compatible with wrappers.
- class ErrorDelegate(object):
- def __init__(self, reactor):
- self.reactor_impl = reactor._impl
-
- def on_error(self, info):
- ractor = Reactor.wrap(self.reactor_impl)
- ractor.on_error(info)
-
- def on_error_delegate(self):
- return Reactor.ErrorDelegate(self).on_error
-
- def on_error(self, info):
- self.errors.append(info)
- self.yield_()
-
- def _get_global(self):
- return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
-
- def _set_global(self, handler):
- impl = _chandler(handler, self.on_error_delegate())
- pn_reactor_set_global_handler(self._impl, impl)
- pn_decref(impl)
-
- global_handler = property(_get_global, _set_global)
-
- def _get_timeout(self):
- return _millis2timeout(pn_reactor_get_timeout(self._impl))
-
- def _set_timeout(self, secs):
- return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
-
- timeout = property(_get_timeout, _set_timeout)
-
- def yield_(self):
- pn_reactor_yield(self._impl)
-
- def mark(self):
- return pn_reactor_mark(self._impl)
-
- def _get_handler(self):
- return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
-
- def _set_handler(self, handler):
- impl = _chandler(handler, self.on_error_delegate())
- pn_reactor_set_handler(self._impl, impl)
- pn_decref(impl)
-
- handler = property(_get_handler, _set_handler)
-
- def run(self):
- self.timeout = 3.14159265359
- self.start()
- while self.process(): pass
- self.stop()
- self.process()
- self.global_handler = None
- self.handler = None
-
- def wakeup(self):
- n = pn_reactor_wakeup(self._impl)
- if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
-
- def start(self):
- pn_reactor_start(self._impl)
-
- @property
- def quiesced(self):
- return pn_reactor_quiesced(self._impl)
-
- def _check_errors(self):
- if self.errors:
- for exc, value, tb in self.errors[:-1]:
- traceback.print_exception(exc, value, tb)
- exc, value, tb = self.errors[-1]
- _compat.raise_(exc, value, tb)
-
- def process(self):
- result = pn_reactor_process(self._impl)
- self._check_errors()
- return result
-
- def stop(self):
- pn_reactor_stop(self._impl)
- self._check_errors()
-
- def schedule(self, delay, task):
- impl = _chandler(task, self.on_error_delegate())
- task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
- pn_decref(impl)
- return task
-
- def acceptor(self, host, port, handler=None):
- impl = _chandler(handler, self.on_error_delegate())
- aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
- pn_decref(impl)
- if aimpl:
- return Acceptor(aimpl)
- else:
- raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
-
- def connection(self, handler=None):
- """Deprecated: use connection_to_host() instead
- """
- impl = _chandler(handler, self.on_error_delegate())
- result = Connection.wrap(pn_reactor_connection(self._impl, impl))
- if impl: pn_decref(impl)
- return result
-
- def connection_to_host(self, host, port, handler=None):
- """Create an outgoing Connection that will be managed by the reactor.
- The reactor's pn_iohandler will create a socket connection to the host
- once the connection is opened.
- """
- conn = self.connection(handler)
- self.set_connection_host(conn, host, port)
- return conn
-
- def set_connection_host(self, connection, host, port):
- """Change the address used by the connection. The address is
- used by the reactor's iohandler to create an outgoing socket
- connection. This must be set prior to opening the connection.
- """
- pn_reactor_set_connection_host(self._impl,
- connection._impl,
- unicode2utf8(str(host)),
- unicode2utf8(str(port)))
-
- def get_connection_address(self, connection):
- """This may be used to retrieve the remote peer address.
- @return: string containing the address in URL format or None if no
- address is available. Use the proton.Url class to create a Url object
- from the returned value.
- """
- _url = pn_reactor_get_connection_address(self._impl, connection._impl)
- return utf82unicode(_url)
-
- def selectable(self, handler=None):
- impl = _chandler(handler, self.on_error_delegate())
- result = Selectable.wrap(pn_reactor_selectable(self._impl))
- if impl:
- record = pn_selectable_attachments(result._impl)
- pn_record_set_handler(record, impl)
- pn_decref(impl)
- return result
-
- def update(self, sel):
- pn_reactor_update(self._impl, sel._impl)
-
- def push_event(self, obj, etype):
- pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-
-
-from ._events import wrappers as _wrappers
-
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
-
-
-class EventInjector(object):
- """
- Can be added to a reactor to allow events to be triggered by an
- external thread but handled on the event thread associated with
- the reactor. An instance of this class can be passed to the
- Reactor.selectable() method of the reactor in order to activate
- it. The close() method should be called when it is no longer
- needed, to allow the event loop to end if needed.
- """
-
- def __init__(self):
- self.queue = queue.Queue()
- self.pipe = os.pipe()
- self._closed = False
-
- def trigger(self, event):
- """
- Request that the given event be dispatched on the event thread
- of the reactor to which this EventInjector was added.
- """
- self.queue.put(event)
- os.write(self.pipe[1], b"!")
-
- def close(self):
- """
- Request that this EventInjector be closed. Existing events
- will be dispatched on the reactors event dispatch thread,
- then this will be removed from the set of interest.
- """
- self._closed = True
- os.write(self.pipe[1], b"!")
-
- def fileno(self):
- return self.pipe[0]
-
- def on_selectable_init(self, event):
- sel = event.context
- sel.fileno(self.fileno())
- sel.reading = True
- event.reactor.update(sel)
-
- def on_selectable_readable(self, event):
- os.read(self.pipe[0], 512)
- while not self.queue.empty():
- requested = self.queue.get()
- event.reactor.push_event(requested.context, requested.type)
- if self._closed:
- s = event.context
- s.terminate()
- event.reactor.update(s)
-
-
-class ApplicationEvent(EventBase):
- """
- Application defined event, which can optionally be associated with
- an engine object and or an arbitrary subject
- """
-
- def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
- super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
- self.connection = connection
- self.session = session
- self.link = link
- self.delivery = delivery
- if self.delivery:
- self.link = self.delivery.link
- if self.link:
- self.session = self.link.session
- if self.session:
- self.connection = self.session.connection
- self.subject = subject
-
- def __repr__(self):
- objects = [self.connection, self.session, self.link, self.delivery, self.subject]
- return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
-
-
-class Transaction(object):
- """
- Class to track state of an AMQP 1.0 transaction.
- """
-
- def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
- self.txn_ctrl = txn_ctrl
- self.handler = handler
- self.id = None
- self._declare = None
- self._discharge = None
- self.failed = False
- self._pending = []
- self.settle_before_discharge = settle_before_discharge
- self.declare()
-
- def commit(self):
- self.discharge(False)
-
- def abort(self):
- self.discharge(True)
-
- def declare(self):
- self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
-
- def discharge(self, failed):
- self.failed = failed
- self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
-
- def _send_ctrl(self, descriptor, value):
- delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
- delivery.transaction = self
- return delivery
-
- def send(self, sender, msg, tag=None):
- dlv = sender.send(msg, tag=tag)
- dlv.local.data = [self.id]
- dlv.update(0x34)
- return dlv
-
- def accept(self, delivery):
- self.update(delivery, PN_ACCEPTED)
- if self.settle_before_discharge:
- delivery.settle()
- else:
- self._pending.append(delivery)
-
- def update(self, delivery, state=None):
- if state:
- delivery.local.data = [self.id, Described(ulong(state), [])]
- delivery.update(0x34)
-
- def _release_pending(self):
- for d in self._pending:
- d.update(Delivery.RELEASED)
- d.settle()
- self._clear_pending()
-
- def _clear_pending(self):
- self._pending = []
-
- def handle_outcome(self, event):
- if event.delivery == self._declare:
- if event.delivery.remote.data:
- self.id = event.delivery.remote.data[0]
- self.handler.on_transaction_declared(event)
- elif event.delivery.remote_state == Delivery.REJECTED:
- self.handler.on_transaction_declare_failed(event)
- else:
- log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
- self.handler.on_transaction_declare_failed(event)
- elif event.delivery == self._discharge:
- if event.delivery.remote_state == Delivery.REJECTED:
- if not self.failed:
- self.handler.on_transaction_commit_failed(event)
- self._release_pending() # make this optional?
- else:
- if self.failed:
- self.handler.on_transaction_aborted(event)
- self._release_pending()
- else:
- self.handler.on_transaction_committed(event)
- self._clear_pending()
-
-
-class LinkOption(object):
- """
- Abstract interface for link configuration options
- """
-
- def apply(self, link):
- """
- Subclasses will implement any configuration logic in this
- method
- """
- pass
-
- def test(self, link):
- """
- Subclasses can override this to selectively apply an option
- e.g. based on some link criteria
- """
- return True
-
-
-class AtMostOnce(LinkOption):
- def apply(self, link):
- link.snd_settle_mode = Link.SND_SETTLED
-
-
-class AtLeastOnce(LinkOption):
- def apply(self, link):
- link.snd_settle_mode = Link.SND_UNSETTLED
- link.rcv_settle_mode = Link.RCV_FIRST
-
-
-class SenderOption(LinkOption):
- def apply(self, sender): pass
-
- def test(self, link): return link.is_sender
-
-
-class ReceiverOption(LinkOption):
- def apply(self, receiver): pass
-
- def test(self, link): return link.is_receiver
-
-
-class DynamicNodeProperties(LinkOption):
- def __init__(self, props={}):
- self.properties = {}
- for k in props:
- if isinstance(k, symbol):
- self.properties[k] = props[k]
- else:
- self.properties[symbol(k)] = props[k]
-
- def apply(self, link):
- if link.is_receiver:
- link.source.properties.put_dict(self.properties)
- else:
- link.target.properties.put_dict(self.properties)
-
-
-class Filter(ReceiverOption):
- def __init__(self, filter_set={}):
- self.filter_set = filter_set
-
- def apply(self, receiver):
- receiver.source.filter.put_dict(self.filter_set)
-
-
-class Selector(Filter):
- """
- Configures a link with a message selector filter
- """
-
- def __init__(self, value, name='selector'):
- super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
-
-
-class DurableSubscription(ReceiverOption):
- def apply(self, receiver):
- receiver.source.durability = Terminus.DELIVERIES
- receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
-
-
-class Move(ReceiverOption):
- def apply(self, receiver):
- receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
-
-
-class Copy(ReceiverOption):
- def apply(self, receiver):
- receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
-
-
-def _apply_link_options(options, link):
- if options:
- if isinstance(options, list):
- for o in options:
- if o.test(link): o.apply(link)
- else:
- if options.test(link): options.apply(link)
-
-
-def _create_session(connection, handler=None):
- session = connection.session()
- session.open()
- return session
-
-
-def _get_attr(target, name):
- if hasattr(target, name):
- return getattr(target, name)
- else:
- return None
-
-
-class SessionPerConnection(object):
- def __init__(self):
- self._default_session = None
-
- def session(self, connection):
- if not self._default_session:
- self._default_session = _create_session(connection)
- return self._default_session
-
-
-class GlobalOverrides(object):
- """
- Internal handler that triggers the necessary socket connect for an
- opened connection.
- """
-
- def __init__(self, base):
- self.base = base
-
- def on_unhandled(self, name, event):
- if not self._override(event):
- event.dispatch(self.base)
-
- def _override(self, event):
- conn = event.connection
- return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
-
-
-class Connector(Handler):
- """
- Internal handler that triggers the necessary socket connect for an
- opened connection.
- """
-
- def __init__(self, connection):
- self.connection = connection
- self.address = None
- self.heartbeat = None
- self.reconnect = None
- self.ssl_domain = None
- self.allow_insecure_mechs = True
- self.allowed_mechs = None
- self.sasl_enabled = True
- self.user = None
- self.password = None
- self.virtual_host = None
- self.ssl_sni = None
- self.max_frame_size = None
-
- def _connect(self, connection, reactor):
- assert (reactor is not None)
- url = self.address.next()
- reactor.set_connection_host(connection, url.host, str(url.port))
- # if virtual-host not set, use host from address as default
- if self.virtual_host is None:
- connection.hostname = url.host
- log.debug("connecting to %r..." % url)
-
- transport = Transport()
- if self.sasl_enabled:
- sasl = transport.sasl()
- sasl.allow_insecure_mechs = self.allow_insecure_mechs
- if url.username:
- connection.user = url.username
- elif self.user:
- connection.user = self.user
- if url.password:
- connection.password = url.password
- elif self.password:
- connection.password = self.password
- if self.allowed_mechs:
- sasl.allowed_mechs(self.allowed_mechs)
- transport.bind(connection)
- if self.heartbeat:
- transport.idle_timeout = self.heartbeat
- if url.scheme == 'amqps':
- if not self.ssl_domain:
- raise SSLUnavailable("amqps: SSL libraries not found")
- self.ssl = SSL(transport, self.ssl_domain)
- self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host
- if self.max_frame_size:
- transport.max_frame_size = self.max_frame_size
-
- def on_connection_local_open(self, event):
- self._connect(event.connection, event.reactor)
-
- def on_connection_remote_open(self, event):
- log.debug("connected to %s" % event.connection.hostname)
- if self.reconnect:
- self.reconnect.reset()
- self.transport = None
-
- def on_transport_tail_closed(self, event):
- self.on_transport_closed(event)
-
- def on_transport_closed(self, event):
- if self.connection is None: return
- if self.connection.state & Endpoint.LOCAL_ACTIVE:
- if self.reconnect:
- event.transport.unbind()
- delay = self.reconnect.next()
- if delay == 0:
- log.info("Disconnected, reconnecting...")
- self._connect(self.connection, event.reactor)
- return
- else:
- log.info("Disconnected will try to reconnect after %s seconds" % delay)
- event.reactor.schedule(delay, self)
- return
- else:
- log.debug("Disconnected")
- # See connector.cpp: conn.free()/pn_connection_release() here?
- self.connection = None
-
- def on_timer_task(self, event):
- self._connect(self.connection, event.reactor)
-
-
-class Backoff(object):
- """
- A reconnect strategy involving an increasing delay between
- retries, up to a maximum or 10 seconds.
- """
-
- def __init__(self):
- self.delay = 0
-
- def reset(self):
- self.delay = 0
-
- def next(self):
- current = self.delay
- if current == 0:
- self.delay = 0.1
- else:
- self.delay = min(10, 2 * current)
- return current
-
-
-class Urls(object):
- def __init__(self, values):
- self.values = [Url(v) for v in values]
- self.i = iter(self.values)
-
- def __iter__(self):
- return self
-
- def next(self):
- try:
- return next(self.i)
- except StopIteration:
- self.i = iter(self.values)
- return next(self.i)
-
-
-class SSLConfig(object):
- def __init__(self):
- self.client = SSLDomain(SSLDomain.MODE_CLIENT)
- self.server = SSLDomain(SSLDomain.MODE_SERVER)
-
- def set_credentials(self, cert_file, key_file, password):
- self.client.set_credentials(cert_file, key_file, password)
- self.server.set_credentials(cert_file, key_file, password)
-
- def set_trusted_ca_db(self, certificate_db):
- self.client.set_trusted_ca_db(certificate_db)
- self.server.set_trusted_ca_db(certificate_db)
-
-
-class Container(Reactor):
- """A representation of the AMQP concept of a 'container', which
- loosely speaking is something that establishes links to or from
- another container, over which messages are transfered. This is
- an extension to the Reactor class that adds convenience methods
- for creating connections and sender- or receiver- links.
- """
-
- def __init__(self, *handlers, **kwargs):
- super(Container, self).__init__(*handlers, **kwargs)
- if "impl" not in kwargs:
- try:
- self.ssl = SSLConfig()
- except SSLUnavailable:
- self.ssl = None
- self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
- self.trigger = None
- self.container_id = str(generate_uuid())
- self.allow_insecure_mechs = True
- self.allowed_mechs = None
- self.sasl_enabled = True
- self.user = None
- self.password = None
- Wrapper.__setattr__(self, 'subclass', self.__class__)
-
- def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
- **kwargs):
- """
- Initiates the establishment of an AMQP connection. Returns an
- instance of proton.Connection.
-
- @param url: URL string of process to connect to
-
- @param urls: list of URL strings of process to try to connect to
-
- Only one of url or urls should be specified.
-
- @param reconnect: Reconnect is enabled by default. You can
- pass in an instance of Backoff to control reconnect behavior.
- A value of False will prevent the library from automatically
- trying to reconnect if the underlying socket is disconnected
- before the connection has been closed.
-
- @param heartbeat: A value in milliseconds indicating the
- desired frequency of heartbeats used to test the underlying
- socket is alive.
-
- @param ssl_domain: SSL configuration in the form of an
- instance of proton.SSLDomain.
-
- @param handler: a connection scoped handler that will be
- called to process any events in the scope of this connection
- or its child links
-
- @param kwargs: 'sasl_enabled', which determines whether a sasl
- layer is used for the connection; 'allowed_mechs', an optional
- string containing a space-separated list of SASL mechanisms to
- allow if sasl is enabled; 'allow_insecure_mechs', a flag
- indicating whether insecure mechanisms, such as PLAIN over a
- non-encrypted socket, are allowed; 'virtual_host', the
- hostname to set in the Open performative used by peer to
- determine the correct back-end service for the client. If
- 'virtual_host' is not supplied the host field from the URL is
- used instead; 'user', the user to authenticate; 'password',
- the authentication secret.
-
- """
- conn = self.connection(handler)
- conn.container = self.container_id or str(generate_uuid())
- conn.offered_capabilities = kwargs.get('offered_capabilities')
- conn.desired_capabilities = kwargs.get('desired_capabilities')
- conn.properties = kwargs.get('properties')
-
- connector = Connector(conn)
- connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
- connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
- connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
- connector.user = kwargs.get('user', self.user)
- connector.password = kwargs.get('password', self.password)
- connector.virtual_host = kwargs.get('virtual_host')
- if connector.virtual_host:
- # only set hostname if virtual-host is a non-empty string
- conn.hostname = connector.virtual_host
- connector.ssl_sni = kwargs.get('sni')
- connector.max_frame_size = kwargs.get('max_frame_size')
-
- conn._overrides = connector
- if url:
- connector.address = Urls([url])
- elif urls:
- connector.address = Urls(urls)
- elif address:
- connector.address = address
- else:
- raise ValueError("One of url, urls or address required")
- if heartbeat:
- connector.heartbeat = heartbeat
- if reconnect:
- connector.reconnect = reconnect
- elif reconnect is None:
- connector.reconnect = Backoff()
- # use container's default client domain if none specified. This is
- # only necessary of the URL specifies the "amqps:" scheme
- connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
- conn._session_policy = SessionPerConnection() # todo: make configurable
- conn.open()
- return conn
-
- def _get_id(self, container, remote, local):
- if local and remote:
- "%s-%s-%s" % (container, remote, local)
- elif local:
- return "%s-%s" % (container, local)
- elif remote:
- return "%s-%s" % (container, remote)
- else:
- return "%s-%s" % (container, str(generate_uuid()))
-
- def _get_session(self, context):
- if isinstance(context, Url):
- return self._get_session(self.connect(url=context))
- elif isinstance(context, Session):
- return context
- elif isinstance(context, Connection):
- if hasattr(context, '_session_policy'):
- return context._session_policy.session(context)
- else:
- return _create_session(context)
- else:
- return context.session()
-
- def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
- """
- Initiates the establishment of a link over which messages can
- be sent. Returns an instance of proton.Sender.
-
- There are two patterns of use. (1) A connection can be passed
- as the first argument, in which case the link is established
- on that connection. In this case the target address can be
- specified as the second argument (or as a keyword
- argument). The source address can also be specified if
- desired. (2) Alternatively a URL can be passed as the first
- argument. In this case a new connection will be established on
- which the link will be attached. If a path is specified and
- the target is not, then the path of the URL is used as the
- target address.
-
- The name of the link may be specified if desired, otherwise a
- unique name will be generated.
-
- Various LinkOptions can be specified to further control the
- attachment.
- """
- if isstring(context):
- context = Url(context)
- if isinstance(context, Url) and not target:
- target = context.path
- session = self._get_session(context)
- snd = session.sender(name or self._get_id(session.connection.container, target, source))
- if source:
- snd.source.address = source
- if target:
- snd.target.address = target
- if handler != None:
- snd.handler = handler
- if tags:
- snd.tag_generator = tags
- _apply_link_options(options, snd)
- snd.open()
- return snd
-
- def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
- """
- Initiates the establishment of a link over which messages can
- be received (aka a subscription). Returns an instance of
- proton.Receiver.
-
- There are two patterns of use. (1) A connection can be passed
- as the first argument, in which case the link is established
- on that connection. In this case the source address can be
- specified as the second argument (or as a keyword
- argument). The target address can also be specified if
- desired. (2) Alternatively a URL can be passed as the first
- argument. In this case a new connection will be established on
- which the link will be attached. If a path is specified and
- the source is not, then the path of the URL is used as the
- target address.
-
- The name of the link may be specified if desired, otherwise a
- unique name will be generated.
-
- Various LinkOptions can be specified to further control the
- attachment.
- """
- if isstring(context):
- context = Url(context)
- if isinstance(context, Url) and not source:
- source = context.path
- session = self._get_session(context)
- rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
- if source:
- rcv.source.address = source
- if dynamic:
- rcv.source.dynamic = True
- if target:
- rcv.target.address = target
- if handler != None:
- rcv.handler = handler
- _apply_link_options(options, rcv)
- rcv.open()
- return rcv
-
- def declare_transaction(self, context, handler=None, settle_before_discharge=False):
- if not _get_attr(context, '_txn_ctrl'):
- class InternalTransactionHandler(OutgoingMessageHandler):
- def __init__(self):
- super(InternalTransactionHandler, self).__init__(auto_settle=True)
-
- def on_settled(self, event):
- if hasattr(event.delivery, "transaction"):
- event.transaction = event.delivery.transaction
- event.delivery.transaction.handle_outcome(event)
-
- def on_unhandled(self, method, event):
- if handler:
- event.dispatch(handler)
-
- context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
- context._txn_ctrl.target.type = Terminus.COORDINATOR
- context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
- return Transaction(context._txn_ctrl, handler, settle_before_discharge)
-
- def listen(self, url, ssl_domain=None):
- """
- Initiates a server socket, accepting incoming AMQP connections
- on the interface and port specified.
- """
- url = Url(url)
- acceptor = self.acceptor(url.host, url.port)
- ssl_config = ssl_domain
- if not ssl_config and url.scheme == 'amqps':
- # use container's default server domain
- if self.ssl:
- ssl_config = self.ssl.server
- else:
- raise SSLUnavailable("amqps: SSL libraries not found")
- if ssl_config:
- acceptor.set_ssl_domain(ssl_config)
- return acceptor
-
- def do_work(self, timeout=None):
- if timeout:
- self.timeout = timeout
- return self.process()
+from ._reactor import Container, ApplicationEvent, EventInjector, Handler,\
+ AtLeastOnce, AtMostOnce, DynamicNodeProperties, Filter, Selector, DurableSubscription, Copy, Move,\
+ Reactor
+
+__all__ = [
+ 'Container',
+ 'ApplicationEvent',
+ 'EventInjector',
+ 'Handler',
+ 'AtLeastOnce',
+ 'AtMostOnce',
+ 'DynamicNodeProperties',
+ 'Filter',
+ 'Selector',
+ 'DurableSubscription',
+ 'Copy',
+ 'Move',
+ 'Reactor'
+]
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a566808/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/python/proton/utils.py b/python/proton/utils.py
index fd8f57a..270e534 100644
--- a/python/proton/utils.py
+++ b/python/proton/utils.py
@@ -19,403 +19,12 @@
from __future__ import absolute_import
-import collections
-import time
-import threading
-
-from cproton import pn_reactor_collector, pn_collector_release
-
-from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
-from ._delivery import Delivery
-from ._endpoints import Endpoint, Link
-from ._events import Handler
-from ._url import Url
-
-from .reactor import Container
-from .handlers import MessagingHandler, IncomingMessageHandler
-
-
-class BlockingLink(object):
- def __init__(self, connection, link):
- self.connection = connection
- self.link = link
- self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
- msg="Opening link %s" % link.name)
- self._checkClosed()
-
- def _waitForClose(self, timeout=1):
- try:
- self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
- timeout=timeout,
- msg="Opening link %s" % self.link.name)
- except Timeout as e:
- pass
- self._checkClosed()
-
- def _checkClosed(self):
- if self.link.state & Endpoint.REMOTE_CLOSED:
- self.link.close()
- if not self.connection.closing:
- raise LinkDetached(self.link)
-
- def close(self):
- self.link.close()
- self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
- msg="Closing link %s" % self.link.name)
-
- # Access to other link attributes.
- def __getattr__(self, name):
- return getattr(self.link, name)
-
-
-class SendException(ProtonException):
- """
- Exception used to indicate an exceptional state/condition on a send request
- """
-
- def __init__(self, state):
- self.state = state
-
-
-def _is_settled(delivery):
- return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
-
-
-class BlockingSender(BlockingLink):
- def __init__(self, connection, sender):
- super(BlockingSender, self).__init__(connection, sender)
- if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
- # this may be followed by a detach, which may contain an error condition, so wait a little...
- self._waitForClose()
- # ...but close ourselves if peer does not
- self.link.close()
- raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
-
- def send(self, msg, timeout=False, error_states=None):
- delivery = self.link.send(msg)
- self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
- timeout=timeout)
- if delivery.link.snd_settle_mode != Link.SND_SETTLED:
- delivery.settle()
- bad = error_states
- if bad is None:
- bad = [Delivery.REJECTED, Delivery.RELEASED]
- if delivery.remote_state in bad:
- raise SendException(delivery.remote_state)
- return delivery
-
-
-class Fetcher(MessagingHandler):
- def __init__(self, connection, prefetch):
- super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
- self.connection = connection
- self.incoming = collections.deque([])
- self.unsettled = collections.deque([])
-
- def on_message(self, event):
- self.incoming.append((event.message, event.delivery))
- self.connection.container.yield_() # Wake up the wait() loop to handle the message.
-
- def on_link_error(self, event):
- if event.link.state & Endpoint.LOCAL_ACTIVE:
- event.link.close()
- if not self.connection.closing:
- raise LinkDetached(event.link)
-
- def on_connection_error(self, event):
- if not self.connection.closing:
- raise ConnectionClosed(event.connection)
-
- @property
- def has_message(self):
- return len(self.incoming)
-
- def pop(self):
- message, delivery = self.incoming.popleft()
- if not delivery.settled:
- self.unsettled.append(delivery)
- return message
-
- def settle(self, state=None):
- delivery = self.unsettled.popleft()
- if state:
- delivery.update(state)
- delivery.settle()
-
-
-class BlockingReceiver(BlockingLink):
- def __init__(self, connection, receiver, fetcher, credit=1):
- super(BlockingReceiver, self).__init__(connection, receiver)
- if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
- # this may be followed by a detach, which may contain an error condition, so wait a little...
- self._waitForClose()
- # ...but close ourselves if peer does not
- self.link.close()
- raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
- if credit: receiver.flow(credit)
- self.fetcher = fetcher
- self.container = connection.container
-
- def __del__(self):
- self.fetcher = None
- # The next line causes a core dump if the Proton-C reactor finalizes
- # first. The self.container reference prevents out of order reactor
- # finalization. It may not be set if exception in BlockingLink.__init__
- if hasattr(self, "container"):
- self.link.handler = None # implicit call to reactor
-
- def receive(self, timeout=False):
- if not self.fetcher:
- raise Exception("Can't call receive on this receiver as a handler was provided")
- if not self.link.credit:
- self.link.flow(1)
- self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
- timeout=timeout)
- return self.fetcher.pop()
-
- def accept(self):
- self.settle(Delivery.ACCEPTED)
-
- def reject(self):
- self.settle(Delivery.REJECTED)
-
- def release(self, delivered=True):
- if delivered:
- self.settle(Delivery.MODIFIED)
- else:
- self.settle(Delivery.RELEASED)
-
- def settle(self, state=None):
- if not self.fetcher:
- raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
- self.fetcher.settle(state)
-
-
-class LinkDetached(LinkException):
- def __init__(self, link):
- self.link = link
- if link.is_sender:
- txt = "sender %s to %s closed" % (link.name, link.target.address)
- else:
- txt = "receiver %s from %s closed" % (link.name, link.source.address)
- if link.remote_condition:
- txt += " due to: %s" % link.remote_condition
- self.condition = link.remote_condition.name
- else:
- txt += " by peer"
- self.condition = None
- super(LinkDetached, self).__init__(txt)
-
-
-class ConnectionClosed(ConnectionException):
- def __init__(self, connection):
- self.connection = connection
- txt = "Connection %s closed" % connection.hostname
- if connection.remote_condition:
- txt += " due to: %s" % connection.remote_condition
- self.condition = connection.remote_condition.name
- else:
- txt += " by peer"
- self.condition = None
- super(ConnectionClosed, self).__init__(txt)
-
-
-class BlockingConnection(Handler):
- """
- A synchronous style connection wrapper.
-
- This object's implementation uses OS resources. To ensure they
- are released when the object is no longer in use, make sure that
- object operations are enclosed in a try block and that close() is
- always executed on exit.
- """
-
- def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
- self.disconnected = False
- self.timeout = timeout or 60
- self.container = container or Container()
- self.container.timeout = self.timeout
- self.container.start()
- self.url = Url(url).defaults()
- self.conn = None
- self.closing = False
- failed = True
- try:
- self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
- heartbeat=heartbeat, **kwargs)
- self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
- msg="Opening connection")
- failed = False
- finally:
- if failed and self.conn:
- self.close()
-
- def create_sender(self, address, handler=None, name=None, options=None):
- return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
- options=options))
-
- def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
- prefetch = credit
- if handler:
- fetcher = None
- if prefetch is None:
- prefetch = 1
- else:
- fetcher = Fetcher(self, credit)
- return BlockingReceiver(
- self,
- self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
- options=options), fetcher, credit=prefetch)
-
- def close(self):
- # TODO: provide stronger interrupt protection on cleanup. See PEP 419
- if self.closing:
- return
- self.closing = True
- self.container.errors = []
- try:
- if self.conn:
- self.conn.close()
- self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
- msg="Closing connection")
- finally:
- self.conn.free()
- # Nothing left to block on. Allow reactor to clean up.
- self.run()
- self.conn = None
- self.container.global_handler = None # break circular ref: container to cadapter.on_error
- pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
- self.container = None
-
- def _is_closed(self):
- return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
-
- def run(self):
- """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
- while self.container.process(): pass
- self.container.stop()
- self.container.process()
-
- def wait(self, condition, timeout=False, msg=None):
- """Call process until condition() is true"""
- if timeout is False:
- timeout = self.timeout
- if timeout is None:
- while not condition() and not self.disconnected:
- self.container.process()
- else:
- container_timeout = self.container.timeout
- self.container.timeout = timeout
- try:
- deadline = time.time() + timeout
- while not condition() and not self.disconnected:
- self.container.process()
- if deadline < time.time():
- txt = "Connection %s timed out" % self.url
- if msg: txt += ": " + msg
- raise Timeout(txt)
- finally:
- self.container.timeout = container_timeout
- if self.disconnected or self._is_closed():
- self.container.stop()
- self.conn.handler = None # break cyclical reference
- if self.disconnected and not self._is_closed():
- raise ConnectionException(
- "Connection %s disconnected: %s" % (self.url, self.disconnected))
-
- def on_link_remote_close(self, event):
- if event.link.state & Endpoint.LOCAL_ACTIVE:
- event.link.close()
- if not self.closing:
- raise LinkDetached(event.link)
-
- def on_connection_remote_close(self, event):
- if event.connection.state & Endpoint.LOCAL_ACTIVE:
- event.connection.close()
- if not self.closing:
- raise ConnectionClosed(event.connection)
-
- def on_transport_tail_closed(self, event):
- self.on_transport_closed(event)
-
- def on_transport_head_closed(self, event):
- self.on_transport_closed(event)
-
- def on_transport_closed(self, event):
- self.disconnected = event.transport.condition or "unknown"
-
-
-class AtomicCount(object):
- def __init__(self, start=0, step=1):
- """Thread-safe atomic counter. Start at start, increment by step."""
- self.count, self.step = start, step
- self.lock = threading.Lock()
-
- def next(self):
- """Get the next value"""
- self.lock.acquire()
- self.count += self.step;
- result = self.count
- self.lock.release()
- return result
-
-
-class SyncRequestResponse(IncomingMessageHandler):
- """
- Implementation of the synchronous request-response (aka RPC) pattern.
- @ivar address: Address for all requests, may be None.
- @ivar connection: Connection for requests and responses.
- """
-
- correlation_id = AtomicCount()
-
- def __init__(self, connection, address=None):
- """
- Send requests and receive responses. A single instance can send many requests
- to the same or different addresses.
-
- @param connection: A L{BlockingConnection}
- @param address: Address for all requests.
- If not specified, each request must have the address property set.
- Successive messages may have different addresses.
- """
- super(SyncRequestResponse, self).__init__()
- self.connection = connection
- self.address = address
- self.sender = self.connection.create_sender(self.address)
- # dynamic=true generates a unique address dynamically for this receiver.
- # credit=1 because we want to receive 1 response message initially.
- self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
- self.response = None
-
- def call(self, request):
- """
- Send a request message, wait for and return the response message.
-
- @param request: A L{proton.Message}. If L{self.address} is not set the
- L{self.address} must be set and will be used.
- """
- if not self.address and not request.address:
- raise ValueError("Request message has no address: %s" % request)
- request.reply_to = self.reply_to
- request.correlation_id = correlation_id = str(self.correlation_id.next())
- self.sender.send(request)
-
- def wakeup():
- return self.response and (self.response.correlation_id == correlation_id)
-
- self.connection.wait(wakeup, msg="Waiting for response")
- response = self.response
- self.response = None # Ready for next response.
- self.receiver.flow(1) # Set up credit for the next response.
- return response
-
- @property
- def reply_to(self):
- """Return the dynamic address of our receiver."""
- return self.receiver.remote_source.address
-
- def on_message(self, event):
- """Called when we receive a message for our receiver."""
- self.response = event.message
- self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+from ._utils import BlockingConnection, SyncRequestResponse, SendException, LinkDetached, ConnectionClosed
+
+__all__ = [
+ 'BlockingConnection',
+ 'SyncRequestResponse',
+ 'SendException',
+ 'LinkDetached',
+ 'ConnectionClosed'
+]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org