You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2019/02/25 18:40:18 UTC
[qpid-proton] 01/06: PROTON-1992: [Python] Remove dependency on
Proton Reactor API - Python binding now only uses APIs from Proton Core
library. It uses Python APIs to do all IO and uses Proton purely to process
the AMQP protocol. - It is very compatible with the existing higher level
Python APIs. [In modules proton, proton.reactor, proton.handlers,
proton.utils]
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 1d6e14f8bb077f584afbe5419aab7ed78d422f9b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jan 15 16:41:57 2019 -0500
PROTON-1992: [Python] Remove dependency on Proton Reactor API
- Python binding now only uses APIs from Proton Core library.
It uses Python APIs to do all IO and uses Proton purely to process
the AMQP protocol.
- It is very compatible with the existing higher level Python APIs.
[In modules proton, proton.reactor, proton.handlers, proton.utils]
- Passes the python tests as well as before
- Works with Python 2 and Python 3
- Works on Unix and Windows
- Runs all the python examples
---
python/CMakeLists.txt | 3 +-
python/proton/_endpoints.py | 56 ++---
python/proton/_events.py | 250 ++++++++++++--------
python/proton/_handlers.py | 214 +++++++++++++++--
python/proton/_io.py | 138 +++++++++++
python/proton/_message.py | 13 +-
python/proton/_reactor.py | 438 ++++++++++++++++++++++-------------
python/proton/_reactor_impl.py | 217 -----------------
python/proton/_selectable.py | 93 ++++++++
python/proton/_transport.py | 5 +
python/proton/_utils.py | 4 +-
python/proton/_wrapper.py | 15 ++
python/tests/proton_tests/handler.py | 2 +-
python/tests/proton_tests/reactor.py | 8 +-
14 files changed, 922 insertions(+), 534 deletions(-)
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index c9e659e..a02c401 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -72,6 +72,7 @@ set (pysrc
proton/_endpoints.py
proton/_events.py
proton/_exceptions.py
+ proton/_io.py
proton/_message.py
proton/_transport.py
proton/_url.py
@@ -83,7 +84,7 @@ set (pysrc
proton/_handlers.py
proton/_reactor.py
- proton/_reactor_impl.py
+ proton/_selectable.py
proton/_utils.py
)
# extra files included in the source distribution
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index bf72727..e873710 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -27,7 +27,6 @@ import weakref
from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \
PN_REMOTE_CLOSED, \
- pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \
pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \
pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \
pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \
@@ -81,6 +80,7 @@ class Endpoint(object):
def _init(self):
self.condition = None
+ self._handler = None
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@@ -97,35 +97,21 @@ class Endpoint(object):
assert False, "Subclass must override this!"
def _get_handler(self):
- from . import _reactor
- from . import _reactor_impl
- ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
- if ractor:
- on_error = ractor.on_error_delegate()
- else:
- on_error = None
- record = self._get_attachments()
- return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
+ return self._handler
def _set_handler(self, handler):
- from . import _reactor
- from . import _reactor_impl
- ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
- if ractor:
- on_error = ractor.on_error_delegate()
+ # TODO Hack This is here for some very odd (IMO) backwards compat behaviour
+ from ._events import Handler
+ if handler is None:
+ self._handler = None
+ elif issubclass(type(handler), Handler):
+ self._handler = handler
else:
- on_error = None
- impl = _reactor_impl._chandler(handler, on_error)
- record = self._get_attachments()
- pn_record_set_handler(record, impl)
- pn_decref(impl)
+ self._handler = Handler()
+ self._handler.add(handler)
handler = property(_get_handler, _set_handler)
- @property
- def transport(self):
- return self.connection.transport
-
class Connection(Wrapper, Endpoint):
"""
@@ -147,6 +133,8 @@ class Connection(Wrapper, Endpoint):
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
+ self.url = None
+ self._acceptor = None
def _get_attachments(self):
return pn_connection_attachments(self._impl)
@@ -183,7 +171,7 @@ class Connection(Wrapper, Endpoint):
return utf82unicode(pn_connection_get_container(self._impl))
def _set_container(self, name):
- return pn_connection_set_container(self._impl, unicode2utf8(name))
+ pn_connection_set_container(self._impl, unicode2utf8(name))
container = property(_get_container, _set_container)
@@ -191,7 +179,7 @@ class Connection(Wrapper, Endpoint):
return utf82unicode(pn_connection_get_hostname(self._impl))
def _set_hostname(self, name):
- return pn_connection_set_hostname(self._impl, unicode2utf8(name))
+ pn_connection_set_hostname(self._impl, unicode2utf8(name))
hostname = property(_get_hostname, _set_hostname,
doc="""
@@ -206,7 +194,7 @@ and SASL layers to identify the peer.
return utf82unicode(pn_connection_get_user(self._impl))
def _set_user(self, name):
- return pn_connection_set_user(self._impl, unicode2utf8(name))
+ pn_connection_set_user(self._impl, unicode2utf8(name))
user = property(_get_user, _set_user)
@@ -214,7 +202,7 @@ and SASL layers to identify the peer.
return None
def _set_password(self, name):
- return pn_connection_set_password(self._impl, unicode2utf8(name))
+ pn_connection_set_password(self._impl, unicode2utf8(name))
password = property(_get_password, _set_password)
@@ -243,6 +231,10 @@ and SASL layers to identify the peer.
"""The properties specified by the remote peer for this connection."""
return dat2obj(pn_connection_remote_properties(self._impl))
+ @property
+ def connected_address(self):
+ return self.url and str(self.url)
+
def open(self):
"""
Opens the connection.
@@ -374,6 +366,10 @@ class Session(Wrapper, Endpoint):
def connection(self):
return Connection.wrap(pn_session_connection(self._impl))
+ @property
+ def transport(self):
+ return self.connection.transport
+
def sender(self, name):
return Sender(pn_sender(self._impl, unicode2utf8(name)))
@@ -486,6 +482,10 @@ class Link(Wrapper, Endpoint):
"""The connection on which this link was attached."""
return self.session.connection
+ @property
+ def transport(self):
+ return self.session.transport
+
def delivery(self, tag):
return Delivery(pn_delivery(self._impl, tag))
diff --git a/python/proton/_events.py b/python/proton/_events.py
index c6d5459..d322b2e 100644
--- a/python/proton/_events.py
+++ b/python/proton/_events.py
@@ -22,26 +22,23 @@ from __future__ import absolute_import
import threading
from cproton import PN_SESSION_REMOTE_CLOSE, PN_SESSION_FINAL, pn_event_context, pn_collector_put, \
- PN_SELECTABLE_UPDATED, pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_attachments, pn_event_type, \
- pn_collector_free, pn_handler_dispatch, PN_SELECTABLE_WRITABLE, PN_SELECTABLE_INIT, PN_SESSION_REMOTE_OPEN, \
- pn_collector_peek, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \
+ pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_type, \
+ pn_collector_free, pn_collector_release, PN_SESSION_REMOTE_OPEN, \
+ pn_collector_peek, pn_collector_more, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \
PN_TRANSPORT_ERROR, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_LOCAL_CLOSE, pn_event_delivery, \
- PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, pn_event_reactor, \
+ PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, \
PN_CONNECTION_REMOTE_CLOSE, pn_collector_pop, PN_LINK_INIT, pn_event_link, PN_CONNECTION_UNBOUND, \
- pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, PN_REACTOR_INIT, PN_REACTOR_QUIESCED, \
- PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name, PN_SELECTABLE_READABLE, \
- pn_event_transport, PN_TRANSPORT_TAIL_CLOSED, PN_SELECTABLE_FINAL, PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \
- PN_SESSION_LOCAL_CLOSE, pn_event_copy, PN_REACTOR_FINAL, PN_LINK_LOCAL_OPEN, PN_SELECTABLE_EXPIRED, \
- PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, pn_event_root, PN_SELECTABLE_ERROR, \
+ pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, \
+ PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name,\
+ pn_event_transport, PN_TRANSPORT_TAIL_CLOSED,PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \
+ PN_SESSION_LOCAL_CLOSE, PN_LINK_LOCAL_OPEN, \
+ PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, \
PN_CONNECTION_INIT, pn_event_class, pn_void2py, pn_cast_pn_session, pn_cast_pn_link, pn_cast_pn_delivery, \
- pn_cast_pn_transport, pn_cast_pn_connection, pn_cast_pn_selectable
+ pn_cast_pn_transport, pn_cast_pn_connection
-from ._common import Constant
from ._delivery import Delivery
from ._endpoints import Connection, Session, Link
-from ._reactor_impl import Selectable, WrappedHandler
from ._transport import Transport
-from ._wrapper import Wrapper
class Collector:
@@ -55,10 +52,16 @@ class Collector:
def peek(self):
return Event.wrap(pn_collector_peek(self._impl))
+ def more(self):
+ return pn_collector_more(self._impl)
+
def pop(self):
ev = self.peek()
pn_collector_pop(self._impl)
+ def release(self):
+ pn_collector_release(self._impl)
+
def __del__(self):
pn_collector_free(self._impl)
del self._impl
@@ -104,38 +107,52 @@ class EventType(object):
self._lock.release()
def __repr__(self):
+ return "EventType(name=%s, number=%d)" % (self.name, self.number)
+
+ def __str__(self):
return self.name
def _dispatch(handler, method, *args):
m = getattr(handler, method, None)
if m:
- return m(*args)
+ m(*args)
elif hasattr(handler, "on_unhandled"):
- return handler.on_unhandled(method, *args)
+ handler.on_unhandled(method, *args)
class EventBase(object):
- def __init__(self, clazz, context, type):
- self.clazz = clazz
- self.context = context
- self.type = type
-
- def dispatch(self, handler):
- return _dispatch(handler, self.type.method, self)
+ def __init__(self, type):
+ self._type = type
+ @property
+ def type(self):
+ return self._type
-def _none(x): return None
+ @property
+ def handler(self):
+ return None
+ def dispatch(self, handler, type=None):
+ type = type or self._type
+ _dispatch(handler, type.method, self)
+ if hasattr(handler, "handlers"):
+ for h in handler.handlers:
+ self.dispatch(h, type)
-DELEGATED = Constant("DELEGATED")
+ def __repr__(self):
+ return "%s(%r)" % (self._type, self.context)
def _core(number, method):
return EventType(number=number, method=method)
+def _internal(name):
+ return EventType(name=name)
+
+
wrappers = {
"pn_void": lambda x: pn_void2py(x),
"pn_pyref": lambda x: pn_void2py(x),
@@ -143,16 +160,11 @@ wrappers = {
"pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
"pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
"pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
- "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
- "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
+ "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x))
}
-class Event(Wrapper, EventBase):
- REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
- REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
- REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
-
+class Event(EventBase):
TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
@@ -189,107 +201,159 @@ class Event(Wrapper, EventBase):
TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
- SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
- SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
- SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
- SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
- SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
- SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
- SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
+ # These events are now internal events in the python code
+ REACTOR_INIT = _internal("reactor_init")
+ REACTOR_QUIESCED = _internal("reactor_quiesced")
+ REACTOR_FINAL = _internal("reactor_final")
+
+ SELECTABLE_INIT = _internal("selectable_init")
+ SELECTABLE_UPDATED = _internal("selectable_updated")
+ SELECTABLE_READABLE = _internal("selectable_readable")
+ SELECTABLE_WRITABLE = _internal("selectable_writable")
+ SELECTABLE_EXPIRED = _internal("selectable_expired")
+ SELECTABLE_ERROR = _internal("selectable_error")
+ SELECTABLE_FINAL = _internal("selectable_final")
@staticmethod
- def wrap(impl, number=None):
+ def wrap(impl):
if impl is None:
return None
- if number is None:
- number = pn_event_type(impl)
+ number = pn_event_type(impl)
+ cls = pn_event_class(impl)
- event = Event(impl, number)
+ if cls:
+ clsname = pn_class_name(cls)
+ context = wrappers[clsname](pn_event_context(impl))
- # check for an application defined ApplicationEvent and return that. This
- # avoids an expensive wrap operation invoked by event.context
- if pn_event_class(impl) == PN_PYREF and \
- isinstance(event.context, EventBase):
- return event.context
+ # check for an application defined ApplicationEvent and return that. This
+ # avoids an expensive wrap operation invoked by event.context
+ if cls == PN_PYREF and isinstance(context, EventBase):
+ return context
else:
- return event
+ clsname = None
- def __init__(self, impl, number):
- Wrapper.__init__(self, impl, pn_event_attachments)
- self.__dict__["type"] = EventType.TYPES[number]
+ event = Event(impl, number, clsname, context)
+ return event
- def _init(self):
- pass
+ def __init__(self, impl, number, clsname, context):
+ self._type = EventType.TYPES[number]
+ self._clsname = clsname
+ self._context = context
- def copy(self):
- copy = pn_event_copy(self._impl)
- return Event.wrap(copy)
+ # Do all this messing around to avoid duplicate wrappers
+ if issubclass(type(context), Delivery):
+ self._delivery = context
+ else:
+ self._delivery = Delivery.wrap(pn_event_delivery(impl))
+ if self._delivery:
+ self._link = self._delivery.link
+ elif issubclass(type(context), Link):
+ self._link = context
+ else:
+ self._link = Link.wrap(pn_event_link(impl))
+ if self._link:
+ self._session = self._link.session
+ elif issubclass(type(context), Session):
+ self._session = context
+ else:
+ self._session = Session.wrap(pn_event_session(impl))
+ if self._session:
+ self._connection = self._session.connection
+ elif issubclass(type(context), Connection):
+ self._connection = context
+ else:
+ self._connection = Connection.wrap(pn_event_connection(impl))
- @property
- def clazz(self):
- cls = pn_event_class(self._impl)
- if cls:
- return pn_class_name(cls)
+ if issubclass(type(context), Transport):
+ self._transport = context
else:
- return None
+ self._transport = Transport.wrap(pn_event_transport(impl))
@property
- def root(self):
- return WrappedHandler.wrap(pn_event_root(self._impl))
+ def clazz(self):
+ return self._clsname
@property
def context(self):
- """Returns the context object associated with the event. The type of this depend on the type of event."""
- return wrappers[self.clazz](pn_event_context(self._impl))
+ """Returns the context object associated with the event. The type of this depends on the type of event."""
+ return self._context
- def dispatch(self, handler, type=None):
- type = type or self.type
- if isinstance(handler, WrappedHandler):
- pn_handler_dispatch(handler._impl, self._impl, type.number)
- else:
- result = _dispatch(handler, type.method, self)
- if result != DELEGATED and hasattr(handler, "handlers"):
- for h in handler.handlers:
- self.dispatch(h, type)
+ @property
+ def handler(self):
+ l = self.link
+ if l:
+ h = l.handler
+ if h:
+ return h
+ s = self.session
+ if s:
+ h = s.handler
+ if h:
+ return h
+ c = self.connection
+ if c:
+ h = c.handler
+ if h:
+ return h
+ c = self.context
+ if not c or not hasattr(c, 'handler'):
+ return None
+ h = c.handler
+ return h
@property
def reactor(self):
- """Returns the reactor associated with the event."""
- return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
+ """
+ Deprecated: Returns the container (was reactor) associated with the event.
+ """
+ return self.container
+
+ @property
+ def container(self):
+ """
+ Returns the container associated with the event.
+ """
+ return self._transport._reactor
def __getattr__(self, name):
- r = self.reactor
- if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
- return r
- else:
- return super(Event, self).__getattr__(name)
+ """
+ This will look for a property of the event as an attached context object of the same
+ type as the property (but lowercase)
+ """
+ c = self.context
+ # Direct type or subclass of type
+ if type(c).__name__.lower() == name or name in [x.__name__.lower() for x in type(c).__bases__]:
+ return c
+
+ # If the attached object is the wrong type then see if *it* has a property of that name
+ return getattr(c, name, None)
@property
def transport(self):
"""Returns the transport associated with the event, or null if none is associated with it."""
- return Transport.wrap(pn_event_transport(self._impl))
+ return self._transport
@property
def connection(self):
"""Returns the connection associated with the event, or null if none is associated with it."""
- return Connection.wrap(pn_event_connection(self._impl))
+ return self._connection
@property
def session(self):
"""Returns the session associated with the event, or null if none is associated with it."""
- return Session.wrap(pn_event_session(self._impl))
+ return self._session
@property
def link(self):
"""Returns the link associated with the event, or null if none is associated with it."""
- return Link.wrap(pn_event_link(self._impl))
+ return self._link
@property
def sender(self):
"""Returns the sender link associated with the event, or null if
none is associated with it. This is essentially an alias for
- link(), that does an additional checkon the type of the
+ link(), that does an additional check on the type of the
link."""
l = self.link
if l and l.is_sender:
@@ -301,7 +365,7 @@ class Event(Wrapper, EventBase):
def receiver(self):
"""Returns the receiver link associated with the event, or null if
none is associated with it. This is essentially an alias for
- link(), that does an additional checkon the type of the link."""
+ link(), that does an additional check on the type of the link."""
l = self.link
if l and l.is_receiver:
return l
@@ -311,10 +375,7 @@ class Event(Wrapper, EventBase):
@property
def delivery(self):
"""Returns the delivery associated with the event, or null if none is associated with it."""
- return Delivery.wrap(pn_event_delivery(self._impl))
-
- def __repr__(self):
- return "%s(%s)" % (self.type, self.context)
+ return self._delivery
class LazyHandlers(object):
@@ -329,5 +390,10 @@ class LazyHandlers(object):
class Handler(object):
handlers = LazyHandlers()
+ # TODO What to do with on_error?
+ def add(self, handler, on_error=None):
+ """Add a child handler"""
+ self.handlers.append(handler)
+
def on_unhandled(self, method, *args):
pass
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index f8d5413..c946a3d 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -22,13 +22,15 @@ from __future__ import absolute_import
import logging
import time
import weakref
-from select import select
from ._delivery import Delivery
from ._endpoints import Endpoint
-from ._message import Message
-from ._exceptions import ProtonException
from ._events import Handler, _dispatch
+from ._exceptions import ProtonException
+from ._io import IO
+from ._message import Message
+from ._transport import Transport
+from ._url import Url
log = logging.getLogger("proton")
@@ -672,15 +674,6 @@ CFlowController = FlowController
CHandshaker = Handshaker
-from ._reactor_impl import WrappedHandler
-from cproton import pn_iohandler
-
-class IOHandler(WrappedHandler):
-
- def __init__(self):
- WrappedHandler.__init__(self, pn_iohandler)
-
-
class PythonIO:
def __init__(self):
@@ -726,13 +719,11 @@ class PythonIO:
timeout = deadline - time.time()
else:
timeout = reactor.timeout
- if (timeout < 0): timeout = 0
+ if timeout < 0: timeout = 0
timeout = min(timeout, reactor.timeout)
- readable, writable, _ = select(reading, writing, [], timeout)
+ readable, writable, _ = IO.select(reading, writing, [], timeout)
- reactor.mark()
-
- now = time.time()
+ now = reactor.mark()
for s in readable:
s.readable()
@@ -743,3 +734,192 @@ class PythonIO:
s.expired()
reactor.yield_()
+
+
+# For C style IO handler need to implement Selector
+class IOHandler(Handler):
+
+ def __init__(self):
+ self._selector = IO.Selector()
+
+ def on_selectable_init(self, event):
+ s = event.selectable
+ self._selector.add(s)
+ s._reactor._selectables += 1
+
+ def on_selectable_updated(self, event):
+ s = event.selectable
+ self._selector.update(s)
+
+ def on_selectable_final(self, event):
+ s = event.selectable
+ self._selector.remove(s)
+ s._reactor._selectables -= 1
+ s.release()
+
+ def on_reactor_quiesced(self, event):
+ r = event.reactor
+
+ if not r.quiesced:
+ return
+
+ d = r.timer_deadline
+ readable, writable, expired = self._selector.select(r.timeout)
+
+ now = r.mark()
+
+ for s in readable:
+ s.readable()
+ for s in writable:
+ s.writable()
+ for s in expired:
+ s.expired()
+
+ r.yield_()
+
+ def on_selectable_readable(self, event):
+ s = event.selectable
+ t = s._transport
+
+ # If we're an acceptor we can't have a transport
+ # and we don't want to do anything here in any case
+ if not t:
+ return
+
+ capacity = t.capacity()
+ if capacity > 0:
+ try:
+ b = s.recv(capacity)
+ if len(b) > 0:
+ n = t.push(b)
+ else:
+ # EOF handling
+ self.on_selectable_error(event)
+ except:
+ # TODO: What's the error handling to be here?
+ t.close_tail()
+
+ # Always update as we may have gone to not reading or from
+ # not writing to writing when processing the incoming bytes
+ r = s._reactor
+ self.update(t, s, r.now)
+
+ def on_selectable_writable(self, event):
+ s = event.selectable
+ t = s._transport
+
+ # If we're an acceptor we can't have a transport
+ # and we don't want to do anything here in any case
+ if not t:
+ return
+
+ pending = t.pending()
+ if pending > 0:
+
+ try:
+ n = s.send(t.peek(pending))
+ t.pop(n)
+ except:
+ # TODO: Error? or actually an exception
+ t.close_head()
+
+ newpending = t.pending()
+ if newpending != pending:
+ r = s._reactor
+ self.update(t, s, r.now)
+
+ def on_selectable_error(self, event):
+ s = event.selectable
+ t = s._transport
+
+ t.close_head()
+ t.close_tail()
+ s.terminate()
+ s.update()
+
+ def on_selectable_expired(self, event):
+ s = event.selectable
+ t = s._transport
+ r = s._reactor
+
+ self.update(t, s, r.now)
+
+ def on_connection_local_open(self, event):
+ c = event.connection
+ if not c.state & Endpoint.REMOTE_UNINIT:
+ return
+
+ t = Transport()
+ # It seems perverse, but the C code ignores bind errors too!
+ # and this is required or you get errors because Connector() has already
+ # bound the transport and connection!
+ t.bind_nothrow(c)
+
+ def on_connection_bound(self, event):
+ c = event.connection
+ t = event.transport
+
+ reactor = c._reactor
+
+ # link the new transport to its reactor:
+ t._reactor = reactor
+
+ if c._acceptor:
+ # this connection was created by the acceptor. There is already a
+ # socket assigned to this connection. Nothing needs to be done.
+ return
+
+ url = c.url or Url(c.hostname)
+ url.defaults()
+
+ host = url.host
+ port = url.port
+
+ if not c.user:
+ user = url.username
+ if user:
+ c.user = user
+ password = url.password
+ if password:
+ c.password = password
+
+ # TODO Currently this is synch and will throw if it cannot connect
+ # do we want to handle errors differently? or do it asynch?
+ sock = IO.connect(host, int(port))
+
+ s = reactor.selectable(delegate=sock)
+ s._transport = t
+ t._selectable = s
+ self.update(t, s, reactor.now)
+
+ @staticmethod
+ def update(transport, selectable, now):
+ try:
+ capacity = transport.capacity()
+ selectable.reading = capacity>0
+ except:
+ if transport.closed:
+ selectable.terminate()
+ try:
+ pending = transport.pending()
+ selectable.writing = pending>0
+ except:
+ if transport.closed:
+ selectable.terminate()
+ selectable.deadline = transport.tick(now)
+ selectable.update()
+
+ def on_transport(self, event):
+ t = event.transport
+ r = t._reactor
+ s = t._selectable
+ if s and not s.is_terminal:
+ self.update(t, s, r.now)
+
+ def on_transport_closed(self, event):
+ t = event.transport
+ r = t._reactor
+ s = t._selectable
+ s.terminate()
+ r.update(s)
+ t.unbind()
diff --git a/python/proton/_io.py b/python/proton/_io.py
new file mode 100644
index 0000000..401ba11
--- /dev/null
+++ b/python/proton/_io.py
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import socket
+import select
+import time
+
+PN_INVALID_SOCKET = -1
+
+class IO(object):
+
+ @staticmethod
+ def close(s):
+ s.close()
+
+ @staticmethod
+ def listen(host, port):
+ s = socket.socket()
+ s.bind((host, port))
+ s.listen(10)
+ return s
+
+ @staticmethod
+ def accept(s):
+ return s.accept()
+
+ @staticmethod
+ def connect(host, port):
+ return socket.create_connection((host, port))
+
+ @staticmethod
+ def select(*args, **kwargs):
+ return select.select(*args, **kwargs)
+
+ @staticmethod
+ def sleep(t):
+ time.sleep(t)
+ return
+
+ class Selector(object):
+
+ def __init__(self):
+ self._selectables = set()
+ self._reading = set()
+ self._writing = set()
+ self._deadline = None
+
+ def add(self, selectable):
+ self._selectables.add(selectable)
+ if selectable.reading:
+ self._reading.add(selectable)
+ if selectable.writing:
+ self._writing.add(selectable)
+ if selectable.deadline:
+ if self._deadline is None:
+ self._deadline = selectable.deadline
+ else:
+ self._deadline = min(selectable.deadline, self._deadline)
+
+ def remove(self, selectable):
+ self._selectables.discard(selectable)
+ self._reading.discard(selectable)
+ self._writing.discard(selectable)
+ self.update_deadline()
+
+ @property
+ def selectables(self):
+ return len(self._selectables)
+
+ def update_deadline(self):
+ for sel in self._selectables:
+ if sel.deadline:
+ if self._deadline is None:
+ self._deadline = sel.deadline
+ else:
+ self._deadline = min(sel.deadline, self._deadline)
+
+ def update(self, selectable):
+ self._reading.discard(selectable)
+ self._writing.discard(selectable)
+ if selectable.reading:
+ self._reading.add(selectable)
+ if selectable.writing:
+ self._writing.add(selectable)
+ self.update_deadline()
+
+ def select(self, timeout):
+
+ def select_inner(timeout):
+ r = self._reading
+ w = self._writing
+
+ now = time.time()
+
+ # No timeout or deadline
+ if timeout is None and self._deadline is None:
+ return IO.select(r, w, [])
+
+ if timeout is None:
+ t = max(0, self._deadline - now)
+ return IO.select(r, w, [], t)
+
+ if self._deadline is None:
+ return IO.select(r, w, [], timeout)
+
+ t = max(0, min(timeout, self._deadline - now))
+ if len(r)==0 and len(w)==0:
+ if t > 0: IO.sleep(t)
+ return ([],[],[])
+
+ return IO.select(r, w, [], t)
+
+ r, w, _ = select_inner(timeout)
+
+ # Calculate timed out selectables
+ now = time.time()
+ t = [s for s in self._selectables if s.deadline and now > s.deadline]
+ self._deadline = None
+ self.update_deadline()
+ return r, w, t
diff --git a/python/proton/_message.py b/python/proton/_message.py
index cca498f..2f3709a 100644
--- a/python/proton/_message.py
+++ b/python/proton/_message.py
@@ -39,7 +39,7 @@ from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, \
pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
from . import _compat
-from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._common import isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
from ._data import Data, ulong, symbol
from ._endpoints import Link
from ._exceptions import EXCEPTIONS, MessageException
@@ -52,7 +52,6 @@ except NameError:
unicode = str
-
class Message(object):
"""The L{Message} class is a mutable holder of message content.
@@ -432,7 +431,7 @@ The group-id for any replies.
self.decode(dlv.encoded)
return dlv
- def __repr2__(self):
+ def __repr__(self):
props = []
for attr in ("inferred", "address", "reply_to", "durable", "ttl",
"priority", "first_acquirer", "delivery_count", "id",
@@ -442,11 +441,3 @@ The group-id for any replies.
value = getattr(self, attr)
if value: props.append("%s=%r" % (attr, value))
return "Message(%s)" % ", ".join(props)
-
- def __repr__(self):
- tmp = pn_string(None)
- err = pn_inspect(self._msg, tmp)
- result = pn_string_get(tmp)
- pn_free(tmp)
- self._check(err)
- return result
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index a47625f..6cf5305 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -19,21 +19,16 @@
from __future__ import absolute_import
+from functools import total_ordering
+import heapq
import json
-import os
import logging
+import os
+import time
import traceback
import uuid
-from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
- pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
- pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
- pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
- pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
- pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
- pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
- pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
- pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
+from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE
from ._delivery import Delivery
from ._endpoints import Connection, Endpoint, Link, Session, Terminus
@@ -42,159 +37,175 @@ from ._data import Described, symbol, ulong
from ._message import Message
from ._transport import Transport, SSL, SSLDomain
from ._url import Url
-from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
-from ._events import EventType, EventBase, Handler
-from ._reactor_impl import Selectable, WrappedHandler, _chandler
-from ._wrapper import Wrapper, PYCTX
+from ._common import isstring, unicode2utf8, utf82unicode
+from ._events import Collector, EventType, EventBase, Handler, Event
+from ._selectable import Selectable
-from ._handlers import OutgoingMessageHandler
+from ._handlers import OutgoingMessageHandler, IOHandler
+
+from ._io import IO, PN_INVALID_SOCKET
from . import _compat
from ._compat import queue
-Logger = logging.getLogger("proton")
+
+_logger = logging.getLogger("proton")
def _generate_uuid():
return uuid.uuid4()
-def _timeout2millis(secs):
- if secs is None: return PN_MILLIS_MAX
- return secs2millis(secs)
+def _now():
+ return time.time()
+@total_ordering
+class Task(object):
-def _millis2timeout(millis):
- if millis == PN_MILLIS_MAX: return None
- return millis2secs(millis)
-
-
-class Task(Wrapper):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Task(impl)
+ def __init__(self, reactor, deadline, handler):
+ self._deadline = deadline
+ self._handler = handler
+ self._reactor = reactor
+ self._cancelled = False
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_task_attachments)
-
- def _init(self):
- pass
+ def __lt__(self, rhs):
+ return self._deadline < rhs._deadline
def cancel(self):
- pn_task_cancel(self._impl)
+ self._cancelled = True
+ @property
+ def handler(self):
+ return self._handler
-class Acceptor(Wrapper):
+class TimerSelectable(Selectable):
- def __init__(self, impl):
- Wrapper.__init__(self, impl)
+ def __init__(self, reactor, collector):
+ super(TimerSelectable, self).__init__(None, reactor)
+ self.collect(collector)
+ collector.put(self, Event.SELECTABLE_INIT)
- def set_ssl_domain(self, ssl_domain):
- pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
+ def fileno(self):
+ return PN_INVALID_SOCKET
- def close(self):
- pn_acceptor_close(self._impl)
+ def readable(self):
+ pass
+ def writable(self):
+ pass
-class Reactor(Wrapper):
+ def expired(self):
+ self._reactor.timer_tick()
+ self.deadline = self._reactor.timer_deadline
+ self.update()
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- record = pn_reactor_attachments(impl)
- attrs = pn_void2py(pn_record_get(record, PYCTX))
- if attrs and 'subclass' in attrs:
- return attrs['subclass'](impl=impl)
- else:
- return Reactor(impl=impl)
+class Reactor(object):
def __init__(self, *handlers, **kwargs):
- Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
- for h in handlers:
- self.handler.add(h, on_error=self.on_error_delegate())
-
- def _init(self):
+ self._previous = PN_EVENT_NONE
+ self._timeout = 0
+ self.mark()
+ self._yield = False
+ self._stop = False
+ self._collector = Collector()
+ self._selectable = None
+ self._selectables = 0
+ self._global_handler = IOHandler()
+ self._handler = Handler()
+ self._timerheap = []
+ self._timers = 0
self.errors = []
-
- # on_error relay handler tied to underlying C reactor. Use when the
- # error will always be generated from a callback from this reactor.
- # Needed to prevent reference cycles and be compatible with wrappers.
- class ErrorDelegate(object):
- def __init__(self, reactor):
- self.reactor_impl = reactor._impl
-
- def on_error(self, info):
- ractor = Reactor.wrap(self.reactor_impl)
- ractor.on_error(info)
-
- def on_error_delegate(self):
- return Reactor.ErrorDelegate(self).on_error
+ for h in handlers:
+ self.handler.add(h, on_error=self.on_error)
def on_error(self, info):
self.errors.append(info)
self.yield_()
+ # TODO: need to make this actually return a proxy which catches exceptions and calls
+ # on error.
+ # [Or arrange another way to deal with exceptions thrown by handlers]
+ def _make_handler(self, handler):
+ """
+ Return a proxy handler that dispatches to the provided handler.
+
+ If handler throws an exception then on_error is called with info
+ """
+ return handler
+
def _get_global(self):
- return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
+ return self._global_handler
def _set_global(self, handler):
- impl = _chandler(handler, self.on_error_delegate())
- pn_reactor_set_global_handler(self._impl, impl)
- pn_decref(impl)
+ self._global_handler = self._make_handler(handler)
global_handler = property(_get_global, _set_global)
def _get_timeout(self):
- return _millis2timeout(pn_reactor_get_timeout(self._impl))
+ return self._timeout
def _set_timeout(self, secs):
- return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
+ self._timeout = secs
timeout = property(_get_timeout, _set_timeout)
def yield_(self):
- pn_reactor_yield(self._impl)
+ self._yield = True
def mark(self):
- return pn_reactor_mark(self._impl)
+ """ This sets the reactor now instant to the current time """
+ self._now = _now()
+ return self._now
+
+ @property
+ def now(self):
+ return self._now
def _get_handler(self):
- return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
+ return self._handler
def _set_handler(self, handler):
- impl = _chandler(handler, self.on_error_delegate())
- pn_reactor_set_handler(self._impl, impl)
- pn_decref(impl)
+ self._handler = self._make_handler(handler)
handler = property(_get_handler, _set_handler)
def run(self):
+ # TODO: Why do we timeout like this?
self.timeout = 3.14159265359
self.start()
while self.process(): pass
self.stop()
self.process()
- self.global_handler = None
- self.handler = None
+ # TODO: This isn't correct if we ever run again
+ self._global_handler = None
+ self._handler = None
+ # Cross thread reactor wakeup
def wakeup(self):
- n = pn_reactor_wakeup(self._impl)
- if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
+ # TODO: Do this with pipe and write?
+ #os.write(self._wakeup[1], "x", 1);
+ pass
def start(self):
- pn_reactor_start(self._impl)
+ self.push_event(self, Event.REACTOR_INIT)
+ self._selectable = TimerSelectable(self, self._collector)
+ self._selectable.deadline = self.timer_deadline
+ # TODO set up fd to read for wakeups - but problematic on windows
+ #self._selectable.fileno(self._wakeup[0])
+ #self._selectable.reading = True
+ self.update(self._selectable)
@property
def quiesced(self):
- return pn_reactor_quiesced(self._impl)
+ event = self._collector.peek()
+ if not event:
+ return True
+ if self._collector.more():
+ return False
+ return event.type is Event.REACTOR_QUIESCED
def _check_errors(self):
+ """ This """
if self.errors:
for exc, value, tb in self.errors[:-1]:
traceback.print_exception(exc, value, tb)
@@ -202,35 +213,104 @@ class Reactor(Wrapper):
_compat.raise_(exc, value, tb)
def process(self):
- result = pn_reactor_process(self._impl)
- self._check_errors()
- return result
+ # result = pn_reactor_process(self._impl)
+ # self._check_errors()
+ # return result
+ self.mark()
+ previous = PN_EVENT_NONE
+ while True:
+ if self._yield:
+ self._yield = False
+ _logger.debug('%s Yielding', self)
+ return True
+ event = self._collector.peek()
+ if event:
+ _logger.debug('%s recvd Event: %r', self, event)
+ type = event.type
+
+ # regular handler
+ handler = event.handler or self._handler
+ event.dispatch(handler)
+
+ event.dispatch(self._global_handler)
+
+ previous = type
+ self._previous = type
+ self._collector.pop()
+ elif not self._stop and (self._timers > 0 or self._selectables > 1):
+ if previous is not Event.REACTOR_QUIESCED and self._previous is not Event.REACTOR_FINAL:
+ self.push_event(self, Event.REACTOR_QUIESCED)
+ self.yield_()
+ else:
+ if self._selectable:
+ self._selectable.terminate()
+ self.update(self._selectable)
+ self._selectable = None
+ else:
+ if self._previous is not Event.REACTOR_FINAL:
+ self.push_event(self, Event.REACTOR_FINAL)
+ _logger.debug('%s Stopping', self)
+ return False
def stop(self):
- pn_reactor_stop(self._impl)
+ self._stop = True
self._check_errors()
- def schedule(self, delay, task):
- impl = _chandler(task, self.on_error_delegate())
- task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
- pn_decref(impl)
+ def stop_events(self):
+ self._collector.release()
+
+ def schedule(self, delay, handler):
+ himpl = self._make_handler(handler)
+ task = Task(self, self._now+delay, himpl)
+ heapq.heappush(self._timerheap, task)
+ self._timers += 1
+ deadline = self._timerheap[0]._deadline
+ if self._selectable:
+ self._selectable.deadline = deadline
+ self.update(self._selectable)
return task
+ def timer_tick(self):
+ while self._timers > 0:
+ t = self._timerheap[0]
+ if t._cancelled:
+ heapq.heappop(self._timerheap)
+ self._timers -= 1
+ elif t._deadline > self._now:
+ return
+ else:
+ heapq.heappop(self._timerheap)
+ self._timers -= 1
+ self.push_event(t, Event.TIMER_TASK)
+
+ @property
+ def timer_deadline(self):
+ while self._timers > 0:
+ t = self._timerheap[0]
+ if t._cancelled:
+ heapq.heappop(self._timerheap)
+ self._timers -= 1
+ else:
+ return t._deadline
+ return None
+
def acceptor(self, host, port, handler=None):
- impl = _chandler(handler, self.on_error_delegate())
- aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
- pn_decref(impl)
- if aimpl:
- return Acceptor(aimpl)
+ impl = self._make_handler(handler)
+ a = Acceptor(self, unicode2utf8(host), int(port), impl)
+ if a:
+ return a
else:
- raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
+ raise IOError("%s (%s:%s)" % (str(self.errors), host, port))
def connection(self, handler=None):
"""Deprecated: use connection_to_host() instead
"""
- impl = _chandler(handler, self.on_error_delegate())
- result = Connection.wrap(pn_reactor_connection(self._impl, impl))
- if impl: pn_decref(impl)
+ impl = self._make_handler(handler)
+ result = Connection()
+ if impl:
+ result.handler = impl
+ result._reactor = self
+ result.collect(self._collector)
return result
def connection_to_host(self, host, port, handler=None):
@@ -247,10 +327,7 @@ class Reactor(Wrapper):
used by the reactor's iohandler to create an outgoing socket
connection. This must be set prior to opening the connection.
"""
- pn_reactor_set_connection_host(self._impl,
- connection._impl,
- unicode2utf8(str(host)),
- unicode2utf8(str(port)))
+ connection.set_address(host, port)
def get_connection_address(self, connection):
"""This may be used to retrieve the remote peer address.
@@ -258,29 +335,23 @@ class Reactor(Wrapper):
address is available. Use the proton.Url class to create a Url object
from the returned value.
"""
- _url = pn_reactor_get_connection_address(self._impl, connection._impl)
+ _url = connection.get_address()
return utf82unicode(_url)
- def selectable(self, handler=None):
- impl = _chandler(handler, self.on_error_delegate())
- result = Selectable.wrap(pn_reactor_selectable(self._impl))
- if impl:
- record = pn_selectable_attachments(result._impl)
- pn_record_set_handler(record, impl)
- pn_decref(impl)
+ def selectable(self, handler=None, delegate=None):
+ if delegate is None:
+ delegate = handler
+ result = Selectable(delegate, self)
+ result.collect(self._collector)
+ result.handler = handler
+ self.push_event(result, Event.SELECTABLE_INIT)
return result
- def update(self, sel):
- pn_reactor_update(self._impl, sel._impl)
+ def update(self, selectable):
+ selectable.update()
def push_event(self, obj, etype):
- pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-
-
-from ._events import wrappers as _wrappers
-
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
+ self._collector.put(obj, etype)
class EventInjector(object):
@@ -296,6 +367,7 @@ class EventInjector(object):
def __init__(self):
self.queue = queue.Queue()
self.pipe = os.pipe()
+ self._transport = None
self._closed = False
def trigger(self, event):
@@ -320,19 +392,19 @@ class EventInjector(object):
def on_selectable_init(self, event):
sel = event.context
- sel.fileno(self.fileno())
+ #sel.fileno(self.fileno())
sel.reading = True
- event.reactor.update(sel)
+ sel.update()
def on_selectable_readable(self, event):
+ s = event.context
os.read(self.pipe[0], 512)
while not self.queue.empty():
requested = self.queue.get()
- event.reactor.push_event(requested.context, requested.type)
+ s.push_event(requested.context, requested.type)
if self._closed:
- s = event.context
s.terminate()
- event.reactor.update(s)
+ s.update()
class ApplicationEvent(EventBase):
@@ -342,7 +414,8 @@ class ApplicationEvent(EventBase):
"""
def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
- super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
+ super(ApplicationEvent, self).__init__(EventType(typename))
+ self.clazz = PN_PYREF
self.connection = connection
self.session = session
self.link = link
@@ -355,6 +428,10 @@ class ApplicationEvent(EventBase):
self.connection = self.session.connection
self.subject = subject
+ @property
+ def context(self):
+ return self
+
def __repr__(self):
objects = [self.connection, self.session, self.link, self.delivery, self.subject]
return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
@@ -429,7 +506,7 @@ class Transaction(object):
elif event.delivery.remote_state == Delivery.REJECTED:
self.handler.on_transaction_declare_failed(event)
else:
- Logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
+ _logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
self.handler.on_transaction_declare_failed(event)
elif event.delivery == self._discharge:
if event.delivery.remote_state == Delivery.REJECTED:
@@ -569,7 +646,7 @@ class SessionPerConnection(object):
return self._default_session
-class GlobalOverrides(object):
+class GlobalOverrides(Handler):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
@@ -587,6 +664,49 @@ class GlobalOverrides(object):
return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
+class Acceptor(Handler):
+
+ def __init__(self, reactor, host, port, handler=None):
+ self._ssl_domain = None
+ self._reactor = reactor
+ self._handler = handler
+ sock = IO.listen(host, port)
+ s = reactor.selectable(handler=self, delegate=sock)
+ s.reading = True
+ s._transport = None
+ self._selectable = s
+ reactor.update(s)
+
+ def set_ssl_domain(self, ssl_domain):
+ self._ssl_domain = ssl_domain
+
+ def close(self):
+ if not self._selectable.is_terminal:
+ IO.close(self._selectable)
+ self._selectable.terminate()
+ self._reactor.update(self._selectable)
+
+ def on_selectable_readable(self, event):
+ s = event.selectable
+
+ sock, name = IO.accept(self._selectable)
+ _logger.debug("Accepted connection from %s", name)
+
+ r = self._reactor
+ handler = self._handler or r.handler
+ c = r.connection(handler)
+ c._acceptor = self
+ c.url = Url(host=name[0], port=name[1])
+ t = Transport(Transport.SERVER)
+ if self._ssl_domain:
+ t.ssl(self._ssl_domain)
+ t.bind(c)
+
+ s = r.selectable(delegate=sock)
+ s._transport = t
+ t._selectable = s
+ IOHandler.update(t, s, r.now)
+
class Connector(Handler):
"""
Internal handler that triggers the necessary socket connect for an
@@ -608,14 +728,13 @@ class Connector(Handler):
self.ssl_sni = None
self.max_frame_size = None
- def _connect(self, connection, reactor):
- assert (reactor is not None)
+ def _connect(self, connection):
url = self.address.next()
- reactor.set_connection_host(connection, url.host, str(url.port))
+ connection.url = url
# if virtual-host not set, use host from address as default
if self.virtual_host is None:
connection.hostname = url.host
- Logger.debug("connecting to %r..." % url)
+ _logger.debug("connecting to %r..." % url)
transport = Transport()
if self.sasl_enabled:
@@ -643,10 +762,10 @@ class Connector(Handler):
transport.max_frame_size = self.max_frame_size
def on_connection_local_open(self, event):
- self._connect(event.connection, event.reactor)
+ self._connect(event.connection)
def on_connection_remote_open(self, event):
- Logger.debug("connected to %s" % event.connection.hostname)
+ _logger.debug("connected to %s" % event.connection.hostname)
if self.reconnect:
self.reconnect.reset()
self.transport = None
@@ -661,20 +780,20 @@ class Connector(Handler):
event.transport.unbind()
delay = self.reconnect.next()
if delay == 0:
- Logger.info("Disconnected, reconnecting...")
- self._connect(self.connection, event.reactor)
+ _logger.info("Disconnected, reconnecting...")
+ self._connect(self.connection)
return
else:
- Logger.info("Disconnected will try to reconnect after %s seconds" % delay)
+ _logger.info("Disconnected will try to reconnect after %s seconds" % delay)
event.reactor.schedule(delay, self)
return
else:
- Logger.debug("Disconnected")
+ _logger.debug("Disconnected")
# See connector.cpp: conn.free()/pn_connection_release() here?
self.connection = None
def on_timer_task(self, event):
- self._connect(self.connection, event.reactor)
+ self._connect(self.connection)
class Backoff(object):
@@ -727,7 +846,7 @@ class SSLConfig(object):
self.client.set_trusted_ca_db(certificate_db)
self.server.set_trusted_ca_db(certificate_db)
-def find_config_file():
+def _find_config_file():
confname = 'connect.json'
confpath = ['.', '~/.config/messaging','/etc/messaging']
for d in confpath:
@@ -736,15 +855,15 @@ def find_config_file():
return f
return None
-def get_default_config():
- conf = os.environ.get('MESSAGING_CONNECT_FILE') or find_config_file()
+def _get_default_config():
+ conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file()
if conf and os.path.isfile(conf):
with open(conf, 'r') as f:
return json.load(f)
else:
return {}
-def get_default_port_for_scheme(scheme):
+def _get_default_port_for_scheme(scheme):
if scheme == 'amqps':
return 5671
else:
@@ -773,7 +892,6 @@ class Container(Reactor):
self.sasl_enabled = True
self.user = None
self.password = None
- Wrapper.__setattr__(self, 'subclass', self.__class__)
def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
**kwargs):
@@ -825,9 +943,9 @@ class Container(Reactor):
"""
if not url and not urls and not address:
- config = get_default_config()
+ config = _get_default_config()
scheme = config.get('scheme', 'amqp')
- _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', get_default_port_for_scheme(scheme)))
+ _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme)))
_ssl_domain = None
_kwargs = kwargs
if config.get('user'):
@@ -952,7 +1070,7 @@ class Container(Reactor):
snd.source.address = source
if target:
snd.target.address = target
- if handler != None:
+ if handler is not None:
snd.handler = handler
if tags:
snd.tag_generator = tags
@@ -995,7 +1113,7 @@ class Container(Reactor):
rcv.source.dynamic = True
if target:
rcv.target.address = target
- if handler != None:
+ if handler is not None:
rcv.handler = handler
_apply_link_options(options, rcv)
rcv.open()
diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py
deleted file mode 100644
index 4ffebcd..0000000
--- a/python/proton/_reactor_impl.py
+++ /dev/null
@@ -1,217 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from __future__ import absolute_import
-
-import weakref
-
-from cproton import PN_INVALID_SOCKET, \
- pn_incref, pn_decref, \
- pn_handler_add, pn_handler_clear, pn_pyhandler, \
- pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \
- pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \
- pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \
- pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \
- pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd
-
-from ._common import millis2secs, secs2millis
-from ._wrapper import Wrapper
-
-from . import _compat
-
-_DEFAULT = False
-
-
-class Selectable(Wrapper):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Selectable(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_selectable_attachments)
-
- def _init(self):
- pass
-
- def fileno(self, fd=_DEFAULT):
- if fd is _DEFAULT:
- return pn_selectable_get_fd(self._impl)
- elif fd is None:
- pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
- else:
- pn_selectable_set_fd(self._impl, fd)
-
- def _is_reading(self):
- return pn_selectable_is_reading(self._impl)
-
- def _set_reading(self, val):
- pn_selectable_set_reading(self._impl, bool(val))
-
- reading = property(_is_reading, _set_reading)
-
- def _is_writing(self):
- return pn_selectable_is_writing(self._impl)
-
- def _set_writing(self, val):
- pn_selectable_set_writing(self._impl, bool(val))
-
- writing = property(_is_writing, _set_writing)
-
- def _get_deadline(self):
- tstamp = pn_selectable_get_deadline(self._impl)
- if tstamp:
- return millis2secs(tstamp)
- else:
- return None
-
- def _set_deadline(self, deadline):
- pn_selectable_set_deadline(self._impl, secs2millis(deadline))
-
- deadline = property(_get_deadline, _set_deadline)
-
- def readable(self):
- pn_selectable_readable(self._impl)
-
- def writable(self):
- pn_selectable_writable(self._impl)
-
- def expired(self):
- pn_selectable_expired(self._impl)
-
- def _is_registered(self):
- return pn_selectable_is_registered(self._impl)
-
- def _set_registered(self, registered):
- pn_selectable_set_registered(self._impl, registered)
-
- registered = property(_is_registered, _set_registered,
- doc="""
-The registered property may be get/set by an I/O polling system to
-indicate whether the fd has been registered or not.
-""")
-
- @property
- def is_terminal(self):
- return pn_selectable_is_terminal(self._impl)
-
- def terminate(self):
- pn_selectable_terminate(self._impl)
-
- def release(self):
- pn_selectable_release(self._impl)
-
-
-class _cadapter:
-
- def __init__(self, handler, on_error=None):
- self.handler = handler
- self.on_error = on_error
-
- def dispatch(self, cevent, ctype):
- from ._events import Event
- ev = Event.wrap(cevent, ctype)
- ev.dispatch(self.handler)
-
- def exception(self, exc, val, tb):
- if self.on_error is None:
- _compat.raise_(exc, val, tb)
- else:
- self.on_error((exc, val, tb))
-
-
-class WrappedHandlersChildSurrogate:
- def __init__(self, delegate):
- self.handlers = []
- self.delegate = weakref.ref(delegate)
-
- def on_unhandled(self, method, event):
- from ._events import _dispatch
- delegate = self.delegate()
- if delegate:
- _dispatch(delegate, method, event)
-
-
-class WrappedHandlersProperty(object):
- def __get__(self, obj, clazz):
- if obj is None:
- return None
- return self.surrogate(obj).handlers
-
- def __set__(self, obj, value):
- self.surrogate(obj).handlers = value
-
- def surrogate(self, obj):
- key = "_surrogate"
- objdict = obj.__dict__
- surrogate = objdict.get(key, None)
- if surrogate is None:
- objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
- obj.add(surrogate)
- return surrogate
-
-
-class WrappedHandler(Wrapper):
- handlers = WrappedHandlersProperty()
-
- @classmethod
- def wrap(cls, impl, on_error=None):
- if impl is None:
- return None
- else:
- handler = cls(impl)
- handler.__dict__["on_error"] = on_error
- return handler
-
- def __init__(self, impl_or_constructor):
- Wrapper.__init__(self, impl_or_constructor)
- if list(self.__class__.__mro__).index(WrappedHandler) > 1:
- # instantiate the surrogate
- self.handlers.extend([])
-
- def _on_error(self, info):
- on_error = getattr(self, "on_error", None)
- if on_error is None:
- _compat.raise_(info[0], info[1], info[2])
- else:
- on_error(info)
-
- def add(self, handler, on_error=None):
- if handler is None: return
- if on_error is None: on_error = self._on_error
- impl = _chandler(handler, on_error)
- pn_handler_add(self._impl, impl)
- pn_decref(impl)
-
- def clear(self):
- pn_handler_clear(self._impl)
-
-
-def _chandler(obj, on_error=None):
- if obj is None:
- return None
- elif isinstance(obj, WrappedHandler):
- impl = obj._impl
- pn_incref(impl)
- return impl
- else:
- return pn_pyhandler(_cadapter(obj, on_error))
diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py
new file mode 100644
index 0000000..2125f7d
--- /dev/null
+++ b/python/proton/_selectable.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+
+from ._events import Event
+
+class Selectable(object):
+
+ def __init__(self, delegate, reactor):
+ self._delegate = delegate
+ self.reading = False
+ self.writing = False
+ self._deadline = 0
+ self._terminal = False
+ self._terminated = False
+ self._collector = None
+ self._reactor = reactor
+
+ def release(self):
+ if self._delegate:
+ self._delegate.close()
+
+ def __getattr__(self, name):
+ if self._delegate:
+ return getattr(self._delegate, name)
+ else:
+ return None
+
+ def _get_deadline(self):
+ tstamp = self._deadline
+ if tstamp:
+ return tstamp
+ else:
+ return None
+
+ def _set_deadline(self, deadline):
+ if not deadline:
+ self._deadline = 0
+ else:
+ self._deadline = deadline
+
+ deadline = property(_get_deadline, _set_deadline)
+
+ def collect(self, collector):
+ self._collector = collector
+
+ def push_event(self, context, type):
+ self._collector.put(context, type)
+
+ def update(self):
+ if not self._terminated:
+ if self._terminal:
+ self._terminated = True
+ self.push_event(self, Event.SELECTABLE_FINAL)
+ else:
+ self.push_event(self, Event.SELECTABLE_UPDATED)
+
+ def readable(self):
+ if self._collector:
+ self.push_event(self, Event.SELECTABLE_READABLE)
+
+ def writable(self):
+ if self._collector:
+ self.push_event(self, Event.SELECTABLE_WRITABLE)
+
+ def expired(self):
+ if self._collector:
+ self.push_event(self, Event.SELECTABLE_EXPIRED)
+
+ @property
+ def is_terminal(self):
+ return self._terminal
+
+ def terminate(self):
+ self._terminal = True
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index 3db0078..182fde6 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -88,6 +88,7 @@ class Transport(Wrapper):
def _init(self):
self._sasl = None
self._ssl = None
+ self._reactor = None
def _check(self, err):
if err < 0:
@@ -136,6 +137,10 @@ A callback for trace logging. The callback is passed the transport and log messa
"""Assign a connection to the transport"""
self._check(pn_transport_bind(self._impl, connection._impl))
+ def bind_nothrow(self, connection):
+ """Assign a connection to the transport"""
+ pn_transport_bind(self._impl, connection._impl)
+
def unbind(self):
"""Release the connection"""
self._check(pn_transport_unbind(self._impl))
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index 6462b55..38639bb 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -23,8 +23,6 @@ import collections
import time
import threading
-from cproton import pn_reactor_collector, pn_collector_release
-
from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
from ._delivery import Delivery
from ._endpoints import Endpoint, Link
@@ -284,7 +282,7 @@ class BlockingConnection(Handler):
self.run()
self.conn = None
self.container.global_handler = None # break circular ref: container to cadapter.on_error
- pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
+ self.container.stop_events()
self.container = None
def _is_closed(self):
diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py
index 4ee98e8..1e7a33a 100644
--- a/python/proton/_wrapper.py
+++ b/python/proton/_wrapper.py
@@ -43,6 +43,21 @@ EMPTY_ATTRS = EmptyAttrs()
class Wrapper(object):
+ """ Wrapper for python objects that need to be stored in event contexts and be retrived again from them
+ Quick note on how this works:
+ The actual *python* object has only 3 attributes which redirect into the wrapped C objects:
+ _impl The wrapped C object itself
+ _attrs This is a special pn_record_t holding a PYCTX which is a python dict
+ every attribute in the python object is actually looked up here
+ _record This is the C record itself (so actually identical to _attrs really but
+ a different python type
+
+ Because the objects actual attributes are stored away they must be initialised *after* the wrapping
+ is set up. This is the purpose of the _init method in the wrapped object. Wrapper.__init__ will call
+ eht subclass _init to initialise attributes. So they *must not* be initialised in the subclass __init__
+ before calling the superclass (Wrapper) __init__ or they will not be accessible from the wrapper at all.
+
+ """
def __init__(self, impl_or_constructor, get_context=None):
init = False
diff --git a/python/tests/proton_tests/handler.py b/python/tests/proton_tests/handler.py
index 89376ad..2324073 100644
--- a/python/tests/proton_tests/handler.py
+++ b/python/tests/proton_tests/handler.py
@@ -98,7 +98,7 @@ class HandlerTest(common.Test):
reactor.handler.handlers.append(root)
def event_root(self, event):
- return event.root
+ return event.handler
def event_reactor_handler(self, event):
return event.reactor.handler
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 907f1fc..923af2d 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -374,8 +374,8 @@ class ContainerTest(Test):
def on_connection_opened(self, event):
event.connection.close()
- assert event.container == event.reactor
- assert event.container == container
+ assert event.container is event.reactor
+ assert event.container is container
container.connect(test_handler.url, handler=ConnectionHandler())
container.run()
@@ -418,7 +418,7 @@ class ContainerTest(Test):
self.listener = event.container.listen("%s:%s" % (self.host, self.port))
def on_connection_opened(self, event):
- self.client_addr = event.reactor.get_connection_address(event.connection)
+ self.client_addr = event.connected_address
self.peer_hostname = event.connection.remote_hostname
def on_connection_closing(self, event):
@@ -431,7 +431,7 @@ class ContainerTest(Test):
self.server_addr = None
def on_connection_opened(self, event):
- self.server_addr = event.reactor.get_connection_address(event.connection)
+ self.server_addr = event.connected_address
event.connection.close()
def test_numeric_hostname(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org