You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/04 15:57:36 UTC
[04/41] qpid-proton git commit: PROTON-1850: Split up proton
__init__.py into multiple files - Reformatted python source to (mostly) PEP-8
standards - Control what gets exported from __init__ by restricting what it
imports - Move most of the reactor impl
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_exceptions.py
----------------------------------------------------------------------
diff --git a/python/proton/_exceptions.py b/python/proton/_exceptions.py
new file mode 100644
index 0000000..47420c2
--- /dev/null
+++ b/python/proton/_exceptions.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_TIMEOUT, PN_INTR
+
+
+class ProtonException(Exception):
+ """
+ The root of the proton exception hierarchy. All proton exception
+ classes derive from this exception.
+ """
+ pass
+
+
+class Timeout(ProtonException):
+ """
+ A timeout exception indicates that a blocking operation has timed
+ out.
+ """
+ pass
+
+
+class Interrupt(ProtonException):
+ """
+ An interrupt exception indicates that a blocking operation was interrupted.
+ """
+ pass
+
+
+EXCEPTIONS = {
+ PN_TIMEOUT: Timeout,
+ PN_INTR: Interrupt
+}
+
+
+class MessageException(ProtonException):
+ """
+ The MessageException class is the root of the message exception
+ hierarchy. All exceptions generated by the Message class derive from
+ this exception.
+ """
+ pass
+
+
+class DataException(ProtonException):
+ """
+ The DataException class is the root of the Data exception hierarchy.
+ All exceptions raised by the Data class extend this exception.
+ """
+ pass
+
+
+class TransportException(ProtonException):
+ pass
+
+
+class SSLException(TransportException):
+ pass
+
+
+class SSLUnavailable(SSLException):
+ pass
+
+
+class ConnectionException(ProtonException):
+ pass
+
+
+class SessionException(ProtonException):
+ pass
+
+
+class LinkException(ProtonException):
+ pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_message.py
----------------------------------------------------------------------
diff --git a/python/proton/_message.py b/python/proton/_message.py
new file mode 100644
index 0000000..32a8c72
--- /dev/null
+++ b/python/proton/_message.py
@@ -0,0 +1,465 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_STATUS_SETTLED, PN_DEFAULT_PRIORITY, PN_STATUS_MODIFIED, PN_STATUS_RELEASED, PN_STATUS_ABORTED, \
+ PN_STATUS_REJECTED, PN_STATUS_PENDING, PN_STATUS_UNKNOWN, PN_STATUS_ACCEPTED, \
+ PN_OVERFLOW, \
+ pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \
+ pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \
+ pn_message_get_content_encoding, pn_message_body, \
+ pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \
+ pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \
+ pn_message_is_first_acquirer, pn_message_set_priority, \
+ pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \
+ pn_message_set_user_id, pn_message_set_group_id, \
+ pn_message_id, pn_message_clear, pn_message_set_durable, \
+ pn_message_set_first_acquirer, pn_message_get_delivery_count, \
+ pn_message_decode, pn_message_set_reply_to_group_id, \
+ pn_message_get_group_sequence, pn_message_set_reply_to, \
+ pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \
+ pn_message_instructions, pn_message_get_content_type, \
+ pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \
+ pn_message_set_group_sequence, pn_message_set_inferred, \
+ pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
+
+from . import _compat
+from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._data import Data, ulong, symbol
+from ._endpoints import Link
+from ._exceptions import EXCEPTIONS, MessageException
+
+PENDING = Constant("PENDING")
+ACCEPTED = Constant("ACCEPTED")
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
+MODIFIED = Constant("MODIFIED")
+ABORTED = Constant("ABORTED")
+SETTLED = Constant("SETTLED")
+
+STATUSES = {
+ PN_STATUS_ABORTED: ABORTED,
+ PN_STATUS_ACCEPTED: ACCEPTED,
+ PN_STATUS_REJECTED: REJECTED,
+ PN_STATUS_RELEASED: RELEASED,
+ PN_STATUS_MODIFIED: MODIFIED,
+ PN_STATUS_PENDING: PENDING,
+ PN_STATUS_SETTLED: SETTLED,
+ PN_STATUS_UNKNOWN: None
+}
+
+
+class Message(object):
+ """The L{Message} class is a mutable holder of message content.
+
+ @ivar instructions: delivery instructions for the message
+ @type instructions: dict
+ @ivar annotations: infrastructure defined message annotations
+ @type annotations: dict
+ @ivar properties: application defined message properties
+ @type properties: dict
+ @ivar body: message body
+ @type body: bytes | unicode | dict | list | int | long | float | UUID
+ """
+
+ DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
+
+ def __init__(self, body=None, **kwargs):
+ """
+ @param kwargs: Message property name/value pairs to initialise the Message
+ """
+ self._msg = pn_message()
+ self._id = Data(pn_message_id(self._msg))
+ self._correlation_id = Data(pn_message_correlation_id(self._msg))
+ self.instructions = None
+ self.annotations = None
+ self.properties = None
+ self.body = body
+ for k, v in _compat.iteritems(kwargs):
+ getattr(self, k) # Raise exception if it's not a valid attribute.
+ setattr(self, k, v)
+
+ def __del__(self):
+ if hasattr(self, "_msg"):
+ pn_message_free(self._msg)
+ del self._msg
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, MessageException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
+ else:
+ return err
+
+ def _check_property_keys(self):
+ for k in self.properties.keys():
+ if isinstance(k, unicode):
+ # py2 unicode, py3 str (via hack definition)
+ continue
+ # If key is binary then change to string
+ elif isinstance(k, str):
+ # py2 str
+ self.properties[k.encode('utf-8')] = self.properties.pop(k)
+ else:
+ raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
+
+ def _pre_encode(self):
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+ body = Data(pn_message_body(self._msg))
+
+ inst.clear()
+ if self.instructions is not None:
+ inst.put_object(self.instructions)
+ ann.clear()
+ if self.annotations is not None:
+ ann.put_object(self.annotations)
+ props.clear()
+ if self.properties is not None:
+ self._check_property_keys()
+ props.put_object(self.properties)
+ body.clear()
+ if self.body is not None:
+ body.put_object(self.body)
+
+ def _post_decode(self):
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+ body = Data(pn_message_body(self._msg))
+
+ if inst.next():
+ self.instructions = inst.get_object()
+ else:
+ self.instructions = None
+ if ann.next():
+ self.annotations = ann.get_object()
+ else:
+ self.annotations = None
+ if props.next():
+ self.properties = props.get_object()
+ else:
+ self.properties = None
+ if body.next():
+ self.body = body.get_object()
+ else:
+ self.body = None
+
+ def clear(self):
+ """
+ Clears the contents of the L{Message}. All fields will be reset to
+ their default values.
+ """
+ pn_message_clear(self._msg)
+ self.instructions = None
+ self.annotations = None
+ self.properties = None
+ self.body = None
+
+ def _is_inferred(self):
+ return pn_message_is_inferred(self._msg)
+
+ def _set_inferred(self, value):
+ self._check(pn_message_set_inferred(self._msg, bool(value)))
+
+ inferred = property(_is_inferred, _set_inferred, doc="""
+The inferred flag for a message indicates how the message content
+is encoded into AMQP sections. If inferred is true then binary and
+list values in the body of the message will be encoded as AMQP DATA
+and AMQP SEQUENCE sections, respectively. If inferred is false,
+then all values in the body of the message will be encoded as AMQP
+VALUE sections regardless of their type.
+""")
+
+ def _is_durable(self):
+ return pn_message_is_durable(self._msg)
+
+ def _set_durable(self, value):
+ self._check(pn_message_set_durable(self._msg, bool(value)))
+
+ durable = property(_is_durable, _set_durable,
+ doc="""
+The durable property indicates that the message should be held durably
+by any intermediaries taking responsibility for the message.
+""")
+
+ def _get_priority(self):
+ return pn_message_get_priority(self._msg)
+
+ def _set_priority(self, value):
+ self._check(pn_message_set_priority(self._msg, value))
+
+ priority = property(_get_priority, _set_priority,
+ doc="""
+The priority of the message.
+""")
+
+ def _get_ttl(self):
+ return millis2secs(pn_message_get_ttl(self._msg))
+
+ def _set_ttl(self, value):
+ self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
+
+ ttl = property(_get_ttl, _set_ttl,
+ doc="""
+The time to live of the message measured in seconds. Expired messages
+may be dropped.
+""")
+
+ def _is_first_acquirer(self):
+ return pn_message_is_first_acquirer(self._msg)
+
+ def _set_first_acquirer(self, value):
+ self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
+
+ first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
+ doc="""
+True iff the recipient is the first to acquire the message.
+""")
+
+ def _get_delivery_count(self):
+ return pn_message_get_delivery_count(self._msg)
+
+ def _set_delivery_count(self, value):
+ self._check(pn_message_set_delivery_count(self._msg, value))
+
+ delivery_count = property(_get_delivery_count, _set_delivery_count,
+ doc="""
+The number of delivery attempts made for this message.
+""")
+
+ def _get_id(self):
+ return self._id.get_object()
+
+ def _set_id(self, value):
+ if isinteger(value):
+ value = ulong(value)
+ self._id.rewind()
+ self._id.put_object(value)
+
+ id = property(_get_id, _set_id,
+ doc="""
+The id of the message.
+""")
+
+ def _get_user_id(self):
+ return pn_message_get_user_id(self._msg)
+
+ def _set_user_id(self, value):
+ self._check(pn_message_set_user_id(self._msg, value))
+
+ user_id = property(_get_user_id, _set_user_id,
+ doc="""
+The user id of the message creator.
+""")
+
+ def _get_address(self):
+ return utf82unicode(pn_message_get_address(self._msg))
+
+ def _set_address(self, value):
+ self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
+
+ address = property(_get_address, _set_address,
+ doc="""
+The address of the message.
+""")
+
+ def _get_subject(self):
+ return utf82unicode(pn_message_get_subject(self._msg))
+
+ def _set_subject(self, value):
+ self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
+
+ subject = property(_get_subject, _set_subject,
+ doc="""
+The subject of the message.
+""")
+
+ def _get_reply_to(self):
+ return utf82unicode(pn_message_get_reply_to(self._msg))
+
+ def _set_reply_to(self, value):
+ self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
+
+ reply_to = property(_get_reply_to, _set_reply_to,
+ doc="""
+The reply-to address for the message.
+""")
+
+ def _get_correlation_id(self):
+ return self._correlation_id.get_object()
+
+ def _set_correlation_id(self, value):
+ if isinteger(value):
+ value = ulong(value)
+ self._correlation_id.rewind()
+ self._correlation_id.put_object(value)
+
+ correlation_id = property(_get_correlation_id, _set_correlation_id,
+ doc="""
+The correlation-id for the message.
+""")
+
+ def _get_content_type(self):
+ return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
+
+ def _set_content_type(self, value):
+ self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
+
+ content_type = property(_get_content_type, _set_content_type,
+ doc="""
+The content-type of the message.
+""")
+
+ def _get_content_encoding(self):
+ return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
+
+ def _set_content_encoding(self, value):
+ self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
+
+ content_encoding = property(_get_content_encoding, _set_content_encoding,
+ doc="""
+The content-encoding of the message.
+""")
+
+ def _get_expiry_time(self):
+ return millis2secs(pn_message_get_expiry_time(self._msg))
+
+ def _set_expiry_time(self, value):
+ self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
+
+ expiry_time = property(_get_expiry_time, _set_expiry_time,
+ doc="""
+The expiry time of the message.
+""")
+
+ def _get_creation_time(self):
+ return millis2secs(pn_message_get_creation_time(self._msg))
+
+ def _set_creation_time(self, value):
+ self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
+
+ creation_time = property(_get_creation_time, _set_creation_time,
+ doc="""
+The creation time of the message.
+""")
+
+ def _get_group_id(self):
+ return utf82unicode(pn_message_get_group_id(self._msg))
+
+ def _set_group_id(self, value):
+ self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
+
+ group_id = property(_get_group_id, _set_group_id,
+ doc="""
+The group id of the message.
+""")
+
+ def _get_group_sequence(self):
+ return pn_message_get_group_sequence(self._msg)
+
+ def _set_group_sequence(self, value):
+ self._check(pn_message_set_group_sequence(self._msg, value))
+
+ group_sequence = property(_get_group_sequence, _set_group_sequence,
+ doc="""
+The sequence of the message within its group.
+""")
+
+ def _get_reply_to_group_id(self):
+ return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
+
+ def _set_reply_to_group_id(self, value):
+ self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
+
+ reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
+ doc="""
+The group-id for any replies.
+""")
+
+ def encode(self):
+ self._pre_encode()
+ sz = 16
+ while True:
+ err, data = pn_message_encode(self._msg, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return data
+
+ def decode(self, data):
+ self._check(pn_message_decode(self._msg, data))
+ self._post_decode()
+
+ def send(self, sender, tag=None):
+ dlv = sender.delivery(tag or sender.delivery_tag())
+ encoded = self.encode()
+ sender.stream(encoded)
+ sender.advance()
+ if sender.snd_settle_mode == Link.SND_SETTLED:
+ dlv.settle()
+ return dlv
+
+ def recv(self, link):
+ """
+ Receives and decodes the message content for the current delivery
+ from the link. Upon success it will return the current delivery
+ for the link. If there is no current delivery, or if the current
+ delivery is incomplete, or if the link is not a receiver, it will
+ return None.
+
+ @type link: Link
+ @param link: the link to receive a message from
+ @return the delivery associated with the decoded message (or None)
+
+ """
+ if link.is_sender: return None
+ dlv = link.current
+ if not dlv or dlv.partial: return None
+ dlv.encoded = link.recv(dlv.pending)
+ link.advance()
+ # the sender has already forgotten about the delivery, so we might
+ # as well too
+ if link.remote_snd_settle_mode == Link.SND_SETTLED:
+ dlv.settle()
+ self.decode(dlv.encoded)
+ return dlv
+
+ def __repr2__(self):
+ props = []
+ for attr in ("inferred", "address", "reply_to", "durable", "ttl",
+ "priority", "first_acquirer", "delivery_count", "id",
+ "correlation_id", "user_id", "group_id", "group_sequence",
+ "reply_to_group_id", "instructions", "annotations",
+ "properties", "body"):
+ value = getattr(self, attr)
+ if value: props.append("%s=%r" % (attr, value))
+ return "Message(%s)" % ", ".join(props)
+
+ def __repr__(self):
+ tmp = pn_string(None)
+ err = pn_inspect(self._msg, tmp)
+ result = pn_string_get(tmp)
+ pn_free(tmp)
+ self._check(err)
+ return result
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_reactor_impl.py
----------------------------------------------------------------------
diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py
new file mode 100644
index 0000000..39986ff
--- /dev/null
+++ b/python/proton/_reactor_impl.py
@@ -0,0 +1,217 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import weakref
+
+from cproton import PN_INVALID_SOCKET, \
+ pn_incref, pn_decref, \
+ pn_handler_add, pn_handler_clear, pn_pyhandler, \
+ pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \
+ pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \
+ pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \
+ pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \
+ pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd
+
+from ._common import millis2secs, secs2millis
+from ._wrapper import Wrapper
+
+from . import _compat
+
+_DEFAULT = object()
+
+
+class Selectable(Wrapper):
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Selectable(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_selectable_attachments)
+
+ def _init(self):
+ pass
+
+ def fileno(self, fd=_DEFAULT):
+ if fd is _DEFAULT:
+ return pn_selectable_get_fd(self._impl)
+ elif fd is None:
+ pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
+ else:
+ pn_selectable_set_fd(self._impl, fd)
+
+ def _is_reading(self):
+ return pn_selectable_is_reading(self._impl)
+
+ def _set_reading(self, val):
+ pn_selectable_set_reading(self._impl, bool(val))
+
+ reading = property(_is_reading, _set_reading)
+
+ def _is_writing(self):
+ return pn_selectable_is_writing(self._impl)
+
+ def _set_writing(self, val):
+ pn_selectable_set_writing(self._impl, bool(val))
+
+ writing = property(_is_writing, _set_writing)
+
+ def _get_deadline(self):
+ tstamp = pn_selectable_get_deadline(self._impl)
+ if tstamp:
+ return millis2secs(tstamp)
+ else:
+ return None
+
+ def _set_deadline(self, deadline):
+ pn_selectable_set_deadline(self._impl, secs2millis(deadline))
+
+ deadline = property(_get_deadline, _set_deadline)
+
+ def readable(self):
+ pn_selectable_readable(self._impl)
+
+ def writable(self):
+ pn_selectable_writable(self._impl)
+
+ def expired(self):
+ pn_selectable_expired(self._impl)
+
+ def _is_registered(self):
+ return pn_selectable_is_registered(self._impl)
+
+ def _set_registered(self, registered):
+ pn_selectable_set_registered(self._impl, registered)
+
+ registered = property(_is_registered, _set_registered,
+ doc="""
+The registered property may be get/set by an I/O polling system to
+indicate whether the fd has been registered or not.
+""")
+
+ @property
+ def is_terminal(self):
+ return pn_selectable_is_terminal(self._impl)
+
+ def terminate(self):
+ pn_selectable_terminate(self._impl)
+
+ def release(self):
+ pn_selectable_release(self._impl)
+
+
+class _cadapter:
+
+ def __init__(self, handler, on_error=None):
+ self.handler = handler
+ self.on_error = on_error
+
+ def dispatch(self, cevent, ctype):
+ from ._events import Event
+ ev = Event.wrap(cevent, ctype)
+ ev.dispatch(self.handler)
+
+ def exception(self, exc, val, tb):
+ if self.on_error is None:
+ _compat.raise_(exc, val, tb)
+ else:
+ self.on_error((exc, val, tb))
+
+
+class WrappedHandlersChildSurrogate:
+ def __init__(self, delegate):
+ self.handlers = []
+ self.delegate = weakref.ref(delegate)
+
+ def on_unhandled(self, method, event):
+ from ._events import dispatch
+ delegate = self.delegate()
+ if delegate:
+ dispatch(delegate, method, event)
+
+
+class WrappedHandlersProperty(object):
+ def __get__(self, obj, clazz):
+ if obj is None:
+ return None
+ return self.surrogate(obj).handlers
+
+ def __set__(self, obj, value):
+ self.surrogate(obj).handlers = value
+
+ def surrogate(self, obj):
+ key = "_surrogate"
+ objdict = obj.__dict__
+ surrogate = objdict.get(key, None)
+ if surrogate is None:
+ objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
+ obj.add(surrogate)
+ return surrogate
+
+
+class WrappedHandler(Wrapper):
+ handlers = WrappedHandlersProperty()
+
+ @classmethod
+ def wrap(cls, impl, on_error=None):
+ if impl is None:
+ return None
+ else:
+ handler = cls(impl)
+ handler.__dict__["on_error"] = on_error
+ return handler
+
+ def __init__(self, impl_or_constructor):
+ Wrapper.__init__(self, impl_or_constructor)
+ if list(self.__class__.__mro__).index(WrappedHandler) > 1:
+ # instantiate the surrogate
+ self.handlers.extend([])
+
+ def _on_error(self, info):
+ on_error = getattr(self, "on_error", None)
+ if on_error is None:
+ _compat.raise_(info[0], info[1], info[2])
+ else:
+ on_error(info)
+
+ def add(self, handler, on_error=None):
+ if handler is None: return
+ if on_error is None: on_error = self._on_error
+ impl = _chandler(handler, on_error)
+ pn_handler_add(self._impl, impl)
+ pn_decref(impl)
+
+ def clear(self):
+ pn_handler_clear(self._impl)
+
+
+def _chandler(obj, on_error=None):
+ if obj is None:
+ return None
+ elif isinstance(obj, WrappedHandler):
+ impl = obj._impl
+ pn_incref(impl)
+ return impl
+ else:
+ return pn_pyhandler(_cadapter(obj, on_error))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_transport.py
----------------------------------------------------------------------
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
new file mode 100644
index 0000000..3db0078
--- /dev/null
+++ b/python/proton/_transport.py
@@ -0,0 +1,524 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_SASL_AUTH, PN_SASL_PERM, PN_SASL_SYS, PN_SSL_RESUME_REUSED, PN_SASL_NONE, PN_SSL_SHA1, \
+ PN_SSL_CERT_SUBJECT_COUNTRY_NAME, PN_SASL_OK, PN_SSL_RESUME_UNKNOWN, PN_EOS, PN_SSL_ANONYMOUS_PEER, PN_SSL_MD5, \
+ PN_SSL_CERT_SUBJECT_COMMON_NAME, PN_SSL_VERIFY_PEER, PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY, PN_SSL_MODE_SERVER, \
+ PN_TRACE_DRV, PN_TRACE_RAW, pn_transport, PN_SSL_SHA256, PN_TRACE_FRM, PN_SSL_MODE_CLIENT, PN_SASL_TEMP, \
+ PN_SSL_SHA512, PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT, PN_OK, PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE, \
+ PN_SSL_VERIFY_PEER_NAME, PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME, PN_SSL_RESUME_NEW, PN_TRACE_OFF, \
+ pn_transport_get_channel_max, pn_transport_capacity, pn_transport_push, pn_transport_get_user, pn_transport_tick, \
+ pn_transport_set_max_frame, pn_transport_attachments, pn_transport_unbind, pn_transport_peek, \
+ pn_transport_set_channel_max, pn_transport_close_tail, pn_transport_condition, pn_transport_is_encrypted, \
+ pn_transport_get_frames_input, pn_transport_bind, pn_transport_closed, pn_transport_get_idle_timeout, \
+ pn_transport_get_remote_idle_timeout, pn_transport_get_frames_output, pn_transport_pending, \
+ pn_transport_set_pytracer, pn_transport_close_head, pn_transport_get_remote_max_frame, \
+ pn_transport_is_authenticated, pn_transport_set_idle_timeout, pn_transport_log, pn_transport_get_pytracer, \
+ pn_transport_require_auth, pn_transport_get_max_frame, pn_transport_set_server, pn_transport_remote_channel_max, \
+ pn_transport_require_encryption, pn_transport_pop, pn_transport_connection, \
+ pn_sasl, pn_sasl_set_allow_insecure_mechs, pn_sasl_outcome, pn_transport_error, pn_sasl_get_user, \
+ pn_sasl_extended, pn_sasl_done, pn_sasl_get_allow_insecure_mechs, pn_sasl_allowed_mechs, \
+ pn_sasl_config_name, pn_sasl_config_path, \
+ pn_ssl, pn_ssl_init, pn_ssl_domain_allow_unsecured_client, pn_ssl_domain_free, \
+ pn_ssl_domain, pn_transport_trace, pn_ssl_resume_status, pn_sasl_get_mech, \
+ pn_ssl_domain_set_trusted_ca_db, pn_ssl_get_remote_subject_subfield, pn_ssl_present, \
+ pn_ssl_get_remote_subject, pn_ssl_domain_set_credentials, pn_ssl_domain_set_peer_authentication, \
+ pn_ssl_get_peer_hostname, pn_ssl_set_peer_hostname, pn_ssl_get_cipher_name, pn_ssl_get_cert_fingerprint, \
+ pn_ssl_get_protocol_name, \
+ pn_error_text
+
+from ._common import millis2secs, secs2millis, unicode2utf8, utf82unicode
+from ._condition import cond2obj
+from ._exceptions import EXCEPTIONS, TransportException, SessionException, SSLException, SSLUnavailable
+from ._wrapper import Wrapper
+
+
+class TraceAdapter:
+
+ def __init__(self, tracer):
+ self.tracer = tracer
+
+ def __call__(self, trans_impl, message):
+ self.tracer(Transport.wrap(trans_impl), message)
+
+
+class Transport(Wrapper):
+ TRACE_OFF = PN_TRACE_OFF
+ TRACE_DRV = PN_TRACE_DRV
+ TRACE_FRM = PN_TRACE_FRM
+ TRACE_RAW = PN_TRACE_RAW
+
+ CLIENT = 1
+ SERVER = 2
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Transport(_impl=impl)
+
+ def __init__(self, mode=None, _impl=pn_transport):
+ Wrapper.__init__(self, _impl, pn_transport_attachments)
+ if mode == Transport.SERVER:
+ pn_transport_set_server(self._impl)
+ elif mode is None or mode == Transport.CLIENT:
+ pass
+ else:
+ raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
+
+ def _init(self):
+ self._sasl = None
+ self._ssl = None
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, TransportException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
+ else:
+ return err
+
+ def _set_tracer(self, tracer):
+ pn_transport_set_pytracer(self._impl, TraceAdapter(tracer))
+
+ def _get_tracer(self):
+ adapter = pn_transport_get_pytracer(self._impl)
+ if adapter:
+ return adapter.tracer
+ else:
+ return None
+
+ tracer = property(_get_tracer, _set_tracer,
+ doc="""
+A callback for trace logging. The callback is passed the transport and log message.
+""")
+
+ def log(self, message):
+ pn_transport_log(self._impl, message)
+
+ def require_auth(self, bool):
+ pn_transport_require_auth(self._impl, bool)
+
+ @property
+ def authenticated(self):
+ return pn_transport_is_authenticated(self._impl)
+
+ def require_encryption(self, bool):
+ pn_transport_require_encryption(self._impl, bool)
+
+ @property
+ def encrypted(self):
+ return pn_transport_is_encrypted(self._impl)
+
+ @property
+ def user(self):
+ return pn_transport_get_user(self._impl)
+
+ def bind(self, connection):
+ """Assign a connection to the transport"""
+ self._check(pn_transport_bind(self._impl, connection._impl))
+
+ def unbind(self):
+ """Release the connection"""
+ self._check(pn_transport_unbind(self._impl))
+
+ def trace(self, n):
+ pn_transport_trace(self._impl, n)
+
+ def tick(self, now):
+ """Process any timed events (like heartbeat generation).
+ now = seconds since epoch (float).
+ """
+ return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
+
+ def capacity(self):
+ c = pn_transport_capacity(self._impl)
+ if c >= PN_EOS:
+ return c
+ else:
+ return self._check(c)
+
+ def push(self, binary):
+ n = self._check(pn_transport_push(self._impl, binary))
+ if n != len(binary):
+ raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
+
+ def close_tail(self):
+ self._check(pn_transport_close_tail(self._impl))
+
+ def pending(self):
+ p = pn_transport_pending(self._impl)
+ if p >= PN_EOS:
+ return p
+ else:
+ return self._check(p)
+
+ def peek(self, size):
+ cd, out = pn_transport_peek(self._impl, size)
+ if cd == PN_EOS:
+ return None
+ else:
+ self._check(cd)
+ return out
+
+ def pop(self, size):
+ pn_transport_pop(self._impl, size)
+
+ def close_head(self):
+ self._check(pn_transport_close_head(self._impl))
+
+ @property
+ def closed(self):
+ return pn_transport_closed(self._impl)
+
+ # AMQP 1.0 max-frame-size
+ def _get_max_frame_size(self):
+ return pn_transport_get_max_frame(self._impl)
+
+ def _set_max_frame_size(self, value):
+ pn_transport_set_max_frame(self._impl, value)
+
+ max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
+ doc="""
+Sets the maximum size for received frames (in bytes).
+""")
+
+ @property
+ def remote_max_frame_size(self):
+ return pn_transport_get_remote_max_frame(self._impl)
+
+ def _get_channel_max(self):
+ return pn_transport_get_channel_max(self._impl)
+
+ def _set_channel_max(self, value):
+ if pn_transport_set_channel_max(self._impl, value):
+ raise SessionException("Too late to change channel max.")
+
+ channel_max = property(_get_channel_max, _set_channel_max,
+ doc="""
+Sets the maximum channel that may be used on the transport.
+""")
+
+ @property
+ def remote_channel_max(self):
+ return pn_transport_remote_channel_max(self._impl)
+
+ # AMQP 1.0 idle-time-out
+ def _get_idle_timeout(self):
+ return millis2secs(pn_transport_get_idle_timeout(self._impl))
+
+ def _set_idle_timeout(self, sec):
+ pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
+
+ idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+ doc="""
+The idle timeout of the connection (float, in seconds).
+""")
+
+ @property
+ def remote_idle_timeout(self):
+ return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
+
+ @property
+ def frames_output(self):
+ return pn_transport_get_frames_output(self._impl)
+
+ @property
+ def frames_input(self):
+ return pn_transport_get_frames_input(self._impl)
+
+ def sasl(self):
+ return SASL(self)
+
+ def ssl(self, domain=None, session_details=None):
+ # SSL factory (singleton for this transport)
+ if not self._ssl:
+ self._ssl = SSL(self, domain, session_details)
+ return self._ssl
+
+ @property
+ def condition(self):
+ return cond2obj(pn_transport_condition(self._impl))
+
+ @property
+ def connection(self):
+ from . import _endpoints
+ return _endpoints.Connection.wrap(pn_transport_connection(self._impl))
+
+
+class SASLException(TransportException):
+ pass
+
+
+class SASL(Wrapper):
+ OK = PN_SASL_OK
+ AUTH = PN_SASL_AUTH
+ SYS = PN_SASL_SYS
+ PERM = PN_SASL_PERM
+ TEMP = PN_SASL_TEMP
+
+ @staticmethod
+ def extended():
+ return pn_sasl_extended()
+
+ def __init__(self, transport):
+ Wrapper.__init__(self, transport._impl, pn_transport_attachments)
+ self._sasl = pn_sasl(transport._impl)
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, SASLException)
+ raise exc("[%s]" % (err))
+ else:
+ return err
+
+ @property
+ def user(self):
+ return pn_sasl_get_user(self._sasl)
+
+ @property
+ def mech(self):
+ return pn_sasl_get_mech(self._sasl)
+
+ @property
+ def outcome(self):
+ outcome = pn_sasl_outcome(self._sasl)
+ if outcome == PN_SASL_NONE:
+ return None
+ else:
+ return outcome
+
+ def allowed_mechs(self, mechs):
+ pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
+
+ def _get_allow_insecure_mechs(self):
+ return pn_sasl_get_allow_insecure_mechs(self._sasl)
+
+ def _set_allow_insecure_mechs(self, insecure):
+ pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
+
+ allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
+ doc="""
+Allow unencrypted cleartext passwords (PLAIN mech)
+""")
+
+ def done(self, outcome):
+ pn_sasl_done(self._sasl, outcome)
+
+ def config_name(self, name):
+ pn_sasl_config_name(self._sasl, name)
+
+ def config_path(self, path):
+ pn_sasl_config_path(self._sasl, path)
+
+
+class SSLDomain(object):
+ MODE_CLIENT = PN_SSL_MODE_CLIENT
+ MODE_SERVER = PN_SSL_MODE_SERVER
+ VERIFY_PEER = PN_SSL_VERIFY_PEER
+ VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
+ ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
+
+ def __init__(self, mode):
+ self._domain = pn_ssl_domain(mode)
+ if self._domain is None:
+ raise SSLUnavailable()
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, SSLException)
+ raise exc("SSL failure.")
+ else:
+ return err
+
+ def set_credentials(self, cert_file, key_file, password):
+ return self._check(pn_ssl_domain_set_credentials(self._domain,
+ cert_file, key_file,
+ password))
+
+ def set_trusted_ca_db(self, certificate_db):
+ return self._check(pn_ssl_domain_set_trusted_ca_db(self._domain,
+ certificate_db))
+
+ def set_peer_authentication(self, verify_mode, trusted_CAs=None):
+ return self._check(pn_ssl_domain_set_peer_authentication(self._domain,
+ verify_mode,
+ trusted_CAs))
+
+ def allow_unsecured_client(self):
+ return self._check(pn_ssl_domain_allow_unsecured_client(self._domain))
+
+ def __del__(self):
+ pn_ssl_domain_free(self._domain)
+
+
+class SSL(object):
+
+ @staticmethod
+ def present():
+ return pn_ssl_present()
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, SSLException)
+ raise exc("SSL failure.")
+ else:
+ return err
+
+ def __new__(cls, transport, domain, session_details=None):
+ """Enforce a singleton SSL object per Transport"""
+ if transport._ssl:
+ # unfortunately, we've combined the allocation and the configuration in a
+ # single step. So catch any attempt by the application to provide what
+ # may be a different configuration than the original (hack)
+ ssl = transport._ssl
+ if (domain and (ssl._domain is not domain) or
+ session_details and (ssl._session_details is not session_details)):
+ raise SSLException("Cannot re-configure existing SSL object!")
+ else:
+ obj = super(SSL, cls).__new__(cls)
+ obj._domain = domain
+ obj._session_details = session_details
+ session_id = None
+ if session_details:
+ session_id = session_details.get_session_id()
+ obj._ssl = pn_ssl(transport._impl)
+ if obj._ssl is None:
+ raise SSLUnavailable()
+ if domain:
+ pn_ssl_init(obj._ssl, domain._domain, session_id)
+ transport._ssl = obj
+ return transport._ssl
+
+ def cipher_name(self):
+ rc, name = pn_ssl_get_cipher_name(self._ssl, 128)
+ if rc:
+ return name
+ return None
+
+ def protocol_name(self):
+ rc, name = pn_ssl_get_protocol_name(self._ssl, 128)
+ if rc:
+ return name
+ return None
+
+ SHA1 = PN_SSL_SHA1
+ SHA256 = PN_SSL_SHA256
+ SHA512 = PN_SSL_SHA512
+ MD5 = PN_SSL_MD5
+
+ CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
+ CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
+ CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
+ CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
+ CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
+ CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
+
+ def get_cert_subject_subfield(self, subfield_name):
+ subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
+ return subfield_value
+
+ def get_cert_subject(self):
+ subject = pn_ssl_get_remote_subject(self._ssl)
+ return subject
+
+ def _get_cert_subject_unknown_subfield(self):
+ # Pass in an unhandled enum
+ return self.get_cert_subject_subfield(10)
+
+ # Convenience functions for obtaining the subfields of the subject field.
+ def get_cert_common_name(self):
+ return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME)
+
+ def get_cert_organization(self):
+ return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME)
+
+ def get_cert_organization_unit(self):
+ return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT)
+
+ def get_cert_locality_or_city(self):
+ return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY)
+
+ def get_cert_country(self):
+ return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME)
+
+ def get_cert_state_or_province(self):
+ return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE)
+
+ def get_cert_fingerprint(self, fingerprint_length, digest_name):
+ rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
+ if rc == PN_OK:
+ return fingerprint_str
+ return None
+
+ # Convenience functions for obtaining fingerprint for specific hashing algorithms
+ def _get_cert_fingerprint_unknown_hash_alg(self):
+ return self.get_cert_fingerprint(41, 10)
+
+ def get_cert_fingerprint_sha1(self):
+ return self.get_cert_fingerprint(41, SSL.SHA1)
+
+ def get_cert_fingerprint_sha256(self):
+ # sha256 produces a fingerprint that is 64 characters long
+ return self.get_cert_fingerprint(65, SSL.SHA256)
+
+ def get_cert_fingerprint_sha512(self):
+ # sha512 produces a fingerprint that is 128 characters long
+ return self.get_cert_fingerprint(129, SSL.SHA512)
+
+ def get_cert_fingerprint_md5(self):
+ return self.get_cert_fingerprint(33, SSL.MD5)
+
+ @property
+ def remote_subject(self):
+ return pn_ssl_get_remote_subject(self._ssl)
+
+ RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
+ RESUME_NEW = PN_SSL_RESUME_NEW
+ RESUME_REUSED = PN_SSL_RESUME_REUSED
+
+ def resume_status(self):
+ return pn_ssl_resume_status(self._ssl)
+
+ def _set_peer_hostname(self, hostname):
+ self._check(pn_ssl_set_peer_hostname(self._ssl, unicode2utf8(hostname)))
+
+ def _get_peer_hostname(self):
+ err, name = pn_ssl_get_peer_hostname(self._ssl, 1024)
+ self._check(err)
+ return utf82unicode(name)
+
+ peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
+ doc="""
+Manage the expected name of the remote peer. Used to authenticate the remote.
+""")
+
+
+class SSLSessionDetails(object):
+ """ Unique identifier for the SSL session. Used to resume previous session on a new
+ SSL connection.
+ """
+
+ def __init__(self, session_id):
+ self._session_id = session_id
+
+ def get_session_id(self):
+ return self._session_id
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_url.py
----------------------------------------------------------------------
diff --git a/python/proton/_url.py b/python/proton/_url.py
new file mode 100644
index 0000000..b4a9a6a
--- /dev/null
+++ b/python/proton/_url.py
@@ -0,0 +1,161 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import socket
+
+from cproton import pn_url, pn_url_free, pn_url_parse, pn_url_str, pn_url_get_port, pn_url_get_scheme, \
+ pn_url_get_host, pn_url_get_username, pn_url_get_password, pn_url_get_path, pn_url_set_scheme, pn_url_set_host, \
+ pn_url_set_username, pn_url_set_password, pn_url_set_port, pn_url_set_path
+
+from ._common import unicode2utf8
+
+
+class Url(object):
+ """
+ Simple URL parser/constructor, handles URLs of the form:
+
+ <scheme>://<user>:<password>@<host>:<port>/<path>
+
+ All components can be None if not specified in the URL string.
+
+ The port can be specified as a service name, e.g. 'amqp' in the
+ URL string but Url.port always gives the integer value.
+
+ Warning: The placement of user and password in URLs is not
+ recommended. It can result in credentials leaking out in program
+ logs. Use connection configuration attributes instead.
+
+ @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
+ @ivar user: Username
+ @ivar password: Password
+ @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
+ @ivar port: Integer port.
+ @ivar host_port: Returns host:port
+ """
+
+ AMQPS = "amqps"
+ AMQP = "amqp"
+
+ class Port(int):
+ """An integer port number that can be constructed from a service name string"""
+
+ def __new__(cls, value):
+ """@param value: integer port number or string service name."""
+ port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
+ setattr(port, 'name', str(value))
+ return port
+
+ def __eq__(self, x):
+ return str(self) == x or int(self) == x
+
+ def __ne__(self, x):
+ return not self == x
+
+ def __str__(self):
+ return str(self.name)
+
+ @staticmethod
+ def _port_int(value):
+ """Convert service, an integer or a service name, into an integer port number."""
+ try:
+ return int(value)
+ except ValueError:
+ try:
+ return socket.getservbyname(value)
+ except socket.error:
+ # Not every system has amqp/amqps defined as a service
+ if value == Url.AMQPS:
+ return 5671
+ elif value == Url.AMQP:
+ return 5672
+ else:
+ raise ValueError("Not a valid port number or service name: '%s'" % value)
+
+ def __init__(self, url=None, defaults=True, **kwargs):
+ """
+ @param url: URL string to parse.
+ @param defaults: If true, fill in missing default values in the URL.
+ If false, you can fill them in later by calling self.defaults()
+ @param kwargs: scheme, user, password, host, port, path.
+ If specified, replaces corresponding part in url string.
+ """
+ if url:
+ self._url = pn_url_parse(unicode2utf8(str(url)))
+ if not self._url: raise ValueError("Invalid URL '%s'" % url)
+ else:
+ self._url = pn_url()
+ for k in kwargs: # Let kwargs override values parsed from url
+ getattr(self, k) # Check for invalid kwargs
+ setattr(self, k, kwargs[k])
+ if defaults: self.defaults()
+
+ class PartDescriptor(object):
+ def __init__(self, part):
+ self.getter = globals()["pn_url_get_%s" % part]
+ self.setter = globals()["pn_url_set_%s" % part]
+
+ def __get__(self, obj, type=None): return self.getter(obj._url)
+
+ def __set__(self, obj, value): return self.setter(obj._url, str(value))
+
+ scheme = PartDescriptor('scheme')
+ username = PartDescriptor('username')
+ password = PartDescriptor('password')
+ host = PartDescriptor('host')
+ path = PartDescriptor('path')
+
+ def _get_port(self):
+ portstr = pn_url_get_port(self._url)
+ return portstr and Url.Port(portstr)
+
+ def _set_port(self, value):
+ if value is None:
+ pn_url_set_port(self._url, None)
+ else:
+ pn_url_set_port(self._url, str(Url.Port(value)))
+
+ port = property(_get_port, _set_port)
+
+ def __str__(self):
+ return pn_url_str(self._url)
+
+ def __repr__(self):
+ return "Url(%s://%s/%s)" % (self.scheme, self.host, self.path)
+
+ def __eq__(self, x):
+ return str(self) == str(x)
+
+ def __ne__(self, x):
+ return not self == x
+
+ def __del__(self):
+ pn_url_free(self._url)
+ del self._url
+
+ def defaults(self):
+ """
+ Fill in missing values (scheme, host or port) with defaults
+ @return: self
+ """
+ self.scheme = self.scheme or self.AMQP
+ self.host = self.host or '0.0.0.0'
+ self.port = self.port or self.Port(self.scheme)
+ return self
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_wrapper.py
----------------------------------------------------------------------
diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py
new file mode 100644
index 0000000..805ecb1
--- /dev/null
+++ b/python/proton/_wrapper.py
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from cproton import pn_incref, pn_decref, \
+ pn_py2void, pn_void2py, \
+ pn_record_get, pn_record_def, pn_record_set, \
+ PN_PYREF
+
+
+class EmptyAttrs:
+
+ def __contains__(self, name):
+ return False
+
+ def __getitem__(self, name):
+ raise KeyError(name)
+
+ def __setitem__(self, name, value):
+ raise TypeError("does not support item assignment")
+
+
+EMPTY_ATTRS = EmptyAttrs()
+
+
+class Wrapper(object):
+
+ def __init__(self, impl_or_constructor, get_context=None):
+ init = False
+ if callable(impl_or_constructor):
+ # we are constructing a new object
+ impl = impl_or_constructor()
+ if impl is None:
+ self.__dict__["_impl"] = impl
+ self.__dict__["_attrs"] = EMPTY_ATTRS
+ self.__dict__["_record"] = None
+ from proton import ProtonException
+ raise ProtonException(
+ "Wrapper failed to create wrapped object. Check for file descriptor or memory exhaustion.")
+ init = True
+ else:
+ # we are wrapping an existing object
+ impl = impl_or_constructor
+ pn_incref(impl)
+
+ if get_context:
+ record = get_context(impl)
+ attrs = pn_void2py(pn_record_get(record, PYCTX))
+ if attrs is None:
+ attrs = {}
+ pn_record_def(record, PYCTX, PN_PYREF)
+ pn_record_set(record, PYCTX, pn_py2void(attrs))
+ init = True
+ else:
+ attrs = EMPTY_ATTRS
+ init = False
+ record = None
+ self.__dict__["_impl"] = impl
+ self.__dict__["_attrs"] = attrs
+ self.__dict__["_record"] = record
+ if init: self._init()
+
+ def __getattr__(self, name):
+ attrs = self.__dict__["_attrs"]
+ if name in attrs:
+ return attrs[name]
+ else:
+ raise AttributeError(name + " not in _attrs")
+
+ def __setattr__(self, name, value):
+ if hasattr(self.__class__, name):
+ object.__setattr__(self, name, value)
+ else:
+ attrs = self.__dict__["_attrs"]
+ attrs[name] = value
+
+ def __delattr__(self, name):
+ attrs = self.__dict__["_attrs"]
+ if attrs:
+ del attrs[name]
+
+ def __hash__(self):
+ return hash(addressof(self._impl))
+
+ def __eq__(self, other):
+ if isinstance(other, Wrapper):
+ return addressof(self._impl) == addressof(other._impl)
+ return False
+
+ def __ne__(self, other):
+ if isinstance(other, Wrapper):
+ return addressof(self._impl) != addressof(other._impl)
+ return True
+
+ def __del__(self):
+ pn_decref(self._impl)
+
+ def __repr__(self):
+ return '<%s.%s 0x%x ~ 0x%x>' % (self.__class__.__module__,
+ self.__class__.__name__,
+ id(self), addressof(self._impl))
+
+
+PYCTX = int(pn_py2void(Wrapper))
+addressof = int
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 76c9e51..1e61f44 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -16,28 +16,35 @@
# specific language governing permissions and limitations
# under the License.
#
-import heapq, logging, os, re, socket, time, types, weakref
-from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
-from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
-from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+from __future__ import absolute_import
+
+import logging
+import time
+import weakref
from select import select
+from proton import Delivery, Endpoint
+from proton import Message, Handler, ProtonException
+from ._events import dispatch
+
log = logging.getLogger("proton")
+
class OutgoingMessageHandler(Handler):
"""
A utility for simpler and more intuitive handling of delivery
events related to outgoing i.e. sent messages.
"""
+
def __init__(self, auto_settle=True, delegate=None):
self.auto_settle = auto_settle
self.delegate = delegate
def on_link_flow(self, event):
if event.link.is_sender and event.link.credit \
- and event.link.state & Endpoint.LOCAL_ACTIVE \
- and event.link.state & Endpoint.REMOTE_ACTIVE :
+ and event.link.state & Endpoint.LOCAL_ACTIVE \
+ and event.link.state & Endpoint.REMOTE_ACTIVE:
self.on_sendable(event)
def on_delivery(self, event):
@@ -94,23 +101,27 @@ class OutgoingMessageHandler(Handler):
if self.delegate != None:
dispatch(self.delegate, 'on_settled', event)
+
def recv_msg(delivery):
msg = Message()
msg.decode(delivery.link.recv(delivery.pending))
delivery.link.advance()
return msg
+
class Reject(ProtonException):
- """
- An exception that indicate a message should be rejected
- """
- pass
+ """
+ An exception that indicate a message should be rejected
+ """
+ pass
+
class Release(ProtonException):
- """
- An exception that indicate a message should be rejected
- """
- pass
+ """
+ An exception that indicate a message should be rejected
+ """
+ pass
+
class Acking(object):
def accept(self, delivery):
@@ -146,6 +157,7 @@ class Acking(object):
delivery.update(state)
delivery.settle()
+
class IncomingMessageHandler(Handler, Acking):
"""
A utility for simpler and more intuitive handling of delivery
@@ -202,6 +214,7 @@ class IncomingMessageHandler(Handler, Acking):
if self.delegate != None:
dispatch(self.delegate, 'on_aborted', event)
+
class EndpointStateHandler(Handler):
"""
A utility that exposes 'endpoint' events i.e. the open/close for
@@ -272,7 +285,7 @@ class EndpointStateHandler(Handler):
return
self.on_connection_error(event)
elif self.is_local_closed(event.connection):
- self.on_connection_closed(event)
+ self.on_connection_closed(event)
else:
self.on_connection_closing(event)
event.connection.close()
@@ -391,12 +404,14 @@ class EndpointStateHandler(Handler):
if self.delegate != None and event.connection and self.is_local_open(event.connection):
dispatch(self.delegate, 'on_disconnected', event)
+
class MessagingHandler(Handler, Acking):
"""
A general purpose handler that makes the proton-c events somewhat
simpler to deal with and/or avoids repetitive tasks for common use
cases.
"""
+
def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
self.handlers = []
if prefetch:
@@ -414,7 +429,8 @@ class MessagingHandler(Handler, Acking):
"""
if event.transport.condition:
if event.transport.condition.info:
- log.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
+ log.error("%s: %s: %s" % (
+ event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
else:
log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
if event.transport.condition.name in self.fatal_conditions:
@@ -455,36 +471,43 @@ class MessagingHandler(Handler, Acking):
Called when the event loop starts. (Just an alias for on_reactor_init)
"""
pass
+
def on_connection_closed(self, event):
"""
Called when the connection is closed.
"""
pass
+
def on_session_closed(self, event):
"""
Called when the session is closed.
"""
pass
+
def on_link_closed(self, event):
"""
Called when the link is closed.
"""
pass
+
def on_connection_closing(self, event):
"""
Called when the peer initiates the closing of the connection.
"""
pass
+
def on_session_closing(self, event):
"""
Called when the peer initiates the closing of the session.
"""
pass
+
def on_link_closing(self, event):
"""
Called when the peer initiates the closing of the link.
"""
pass
+
def on_disconnected(self, event):
"""
Called when the socket is disconnected.
@@ -525,6 +548,7 @@ class MessagingHandler(Handler, Acking):
retransmitted.
"""
pass
+
def on_message(self, event):
"""
Called when a message is received. The message itself can be
@@ -535,11 +559,13 @@ class MessagingHandler(Handler, Acking):
"""
pass
+
class TransactionHandler(object):
"""
The interface for transaction handlers, i.e. objects that want to
be notified of state changes related to a transaction.
"""
+
def on_transaction_declared(self, event):
pass
@@ -555,6 +581,7 @@ class TransactionHandler(object):
def on_transaction_commit_failed(self, event):
pass
+
class TransactionalClientHandler(MessagingHandler, TransactionHandler):
"""
An extension to the MessagingHandler for applications using
@@ -570,24 +597,29 @@ class TransactionalClientHandler(MessagingHandler, TransactionHandler):
else:
super(TransactionalClientHandler, self).accept(delivery)
-from proton import WrappedHandler
+
+from ._events import WrappedHandler
from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
+
class CFlowController(WrappedHandler):
def __init__(self, window=1024):
WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
+
class CHandshaker(WrappedHandler):
def __init__(self):
WrappedHandler.__init__(self, pn_handshaker)
+
class IOHandler(WrappedHandler):
def __init__(self):
WrappedHandler.__init__(self, pn_iohandler)
+
class PythonIO:
def __init__(self):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/reactor.py b/python/proton/reactor.py
index d5d5183..ccdbf94 100644
--- a/python/proton/reactor.py
+++ b/python/proton/reactor.py
@@ -17,22 +17,35 @@ from __future__ import absolute_import
# specific language governing permissions and limitations
# under the License.
#
-import logging, os, socket, time, types
-from heapq import heappush, heappop, nsmallest
+from __future__ import absolute_import
+import os
+import logging
import traceback
-from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
-from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
-from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
-from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
-from select import select
+from proton import Connection, Delivery, Described
+from proton import Endpoint, EventType, Handler, Link, Message
+from proton import Session, SSL, SSLDomain, SSLUnavailable, symbol
+from proton import Terminus, Transport, ulong, Url
from proton.handlers import OutgoingMessageHandler
-from proton import unicode2utf8, utf82unicode
-from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
-from .wrapper import Wrapper, PYCTX
-from cproton import *
+from proton import generate_uuid
+
+from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
+
+from ._events import EventBase
+from ._reactor_impl import Selectable, WrappedHandler, _chandler
+from ._wrapper import Wrapper, PYCTX
+
+from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
+ pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
+ pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
+ pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
+ pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
+ pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
+ pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
+ pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
+ pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
from . import _compat
@@ -40,6 +53,17 @@ from ._compat import queue
log = logging.getLogger("proton")
+
+def _timeout2millis(secs):
+ if secs is None: return PN_MILLIS_MAX
+ return secs2millis(secs)
+
+
+def _millis2timeout(millis):
+ if millis == PN_MILLIS_MAX: return None
+ return millis2secs(millis)
+
+
class Task(Wrapper):
@staticmethod
@@ -58,6 +82,7 @@ class Task(Wrapper):
def cancel(self):
pn_task_cancel(self._impl)
+
class Acceptor(Wrapper):
def __init__(self, impl):
@@ -69,6 +94,7 @@ class Acceptor(Wrapper):
def close(self):
pn_acceptor_close(self._impl)
+
class Reactor(Wrapper):
@staticmethod
@@ -95,11 +121,12 @@ class Reactor(Wrapper):
# error will always be generated from a callback from this reactor.
# Needed to prevent reference cycles and be compatible with wrappers.
class ErrorDelegate(object):
- def __init__(self, reactor):
- self.reactor_impl = reactor._impl
- def on_error(self, info):
- ractor = Reactor.wrap(self.reactor_impl)
- ractor.on_error(info)
+ def __init__(self, reactor):
+ self.reactor_impl = reactor._impl
+
+ def on_error(self, info):
+ ractor = Reactor.wrap(self.reactor_impl)
+ ractor.on_error(info)
def on_error_delegate(self):
return Reactor.ErrorDelegate(self).on_error
@@ -119,10 +146,10 @@ class Reactor(Wrapper):
global_handler = property(_get_global, _set_global)
def _get_timeout(self):
- return millis2timeout(pn_reactor_get_timeout(self._impl))
+ return _millis2timeout(pn_reactor_get_timeout(self._impl))
def _set_timeout(self, secs):
- return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
+ return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
timeout = property(_get_timeout, _set_timeout)
@@ -244,7 +271,9 @@ class Reactor(Wrapper):
def push_event(self, obj, etype):
pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-from proton import wrappers as _wrappers
+
+from ._events import wrappers as _wrappers
+
_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
@@ -258,6 +287,7 @@ class EventInjector(object):
it. The close() method should be called when it is no longer
needed, to allow the event loop to end if needed.
"""
+
def __init__(self):
self.queue = queue.Queue()
self.pipe = os.pipe()
@@ -305,6 +335,7 @@ class ApplicationEvent(EventBase):
Application defined event, which can optionally be associated with
an engine object and or an arbitrary subject
"""
+
def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
self.connection = connection
@@ -323,10 +354,12 @@ class ApplicationEvent(EventBase):
objects = [self.connection, self.session, self.link, self.delivery, self.subject]
return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
+
class Transaction(object):
"""
Class to track state of an AMQP 1.0 transaction.
"""
+
def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
self.txn_ctrl = txn_ctrl
self.handler = handler
@@ -397,7 +430,7 @@ class Transaction(object):
if event.delivery.remote_state == Delivery.REJECTED:
if not self.failed:
self.handler.on_transaction_commit_failed(event)
- self._release_pending() # make this optional?
+ self._release_pending() # make this optional?
else:
if self.failed:
self.handler.on_transaction_aborted(event)
@@ -406,16 +439,19 @@ class Transaction(object):
self.handler.on_transaction_committed(event)
self._clear_pending()
+
class LinkOption(object):
"""
Abstract interface for link configuration options
"""
+
def apply(self, link):
"""
Subclasses will implement any configuration logic in this
method
"""
pass
+
def test(self, link):
"""
Subclasses can override this to selectively apply an option
@@ -423,23 +459,30 @@ class LinkOption(object):
"""
return True
+
class AtMostOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_SETTLED
+
class AtLeastOnce(LinkOption):
def apply(self, link):
link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_FIRST
+
class SenderOption(LinkOption):
def apply(self, sender): pass
+
def test(self, link): return link.is_sender
+
class ReceiverOption(LinkOption):
def apply(self, receiver): pass
+
def test(self, link): return link.is_receiver
+
class DynamicNodeProperties(LinkOption):
def __init__(self, props={}):
self.properties = {}
@@ -455,6 +498,7 @@ class DynamicNodeProperties(LinkOption):
else:
link.target.properties.put_dict(self.properties)
+
class Filter(ReceiverOption):
def __init__(self, filter_set={}):
self.filter_set = filter_set
@@ -462,26 +506,32 @@ class Filter(ReceiverOption):
def apply(self, receiver):
receiver.source.filter.put_dict(self.filter_set)
+
class Selector(Filter):
"""
Configures a link with a message selector filter
"""
+
def __init__(self, value, name='selector'):
super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
+
class DurableSubscription(ReceiverOption):
def apply(self, receiver):
receiver.source.durability = Terminus.DELIVERIES
receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
+
class Move(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
+
class Copy(ReceiverOption):
def apply(self, receiver):
receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
+
def _apply_link_options(options, link):
if options:
if isinstance(options, list):
@@ -490,6 +540,7 @@ def _apply_link_options(options, link):
else:
if options.test(link): options.apply(link)
+
def _create_session(connection, handler=None):
session = connection.session()
session.open()
@@ -502,6 +553,7 @@ def _get_attr(target, name):
else:
return None
+
class SessionPerConnection(object):
def __init__(self):
self._default_session = None
@@ -511,11 +563,13 @@ class SessionPerConnection(object):
self._default_session = _create_session(connection)
return self._default_session
+
class GlobalOverrides(object):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
"""
+
def __init__(self, base):
self.base = base
@@ -527,11 +581,13 @@ class GlobalOverrides(object):
conn = event.connection
return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
+
class Connector(Handler):
"""
Internal handler that triggers the necessary socket connect for an
opened connection.
"""
+
def __init__(self, connection):
self.connection = connection
self.address = None
@@ -548,7 +604,7 @@ class Connector(Handler):
self.max_frame_size = None
def _connect(self, connection, reactor):
- assert(reactor is not None)
+ assert (reactor is not None)
url = self.address.next()
reactor.set_connection_host(connection, url.host, str(url.port))
# if virtual-host not set, use host from address as default
@@ -615,11 +671,13 @@ class Connector(Handler):
def on_timer_task(self, event):
self._connect(self.connection, event.reactor)
+
class Backoff(object):
"""
A reconnect strategy involving an increasing delay between
retries, up to a maximum or 10 seconds.
"""
+
def __init__(self):
self.delay = 0
@@ -631,9 +689,10 @@ class Backoff(object):
if current == 0:
self.delay = 0.1
else:
- self.delay = min(10, 2*current)
+ self.delay = min(10, 2 * current)
return current
+
class Urls(object):
def __init__(self, values):
self.values = [Url(v) for v in values]
@@ -649,6 +708,7 @@ class Urls(object):
self.i = iter(self.values)
return next(self.i)
+
class SSLConfig(object):
def __init__(self):
self.client = SSLDomain(SSLDomain.MODE_CLIENT)
@@ -670,6 +730,7 @@ class Container(Reactor):
an extension to the Reactor class that adds convenience methods
for creating connections and sender- or receiver- links.
"""
+
def __init__(self, *handlers, **kwargs):
super(Container, self).__init__(*handlers, **kwargs)
if "impl" not in kwargs:
@@ -687,7 +748,8 @@ class Container(Reactor):
self.password = None
Wrapper.__setattr__(self, 'subclass', self.__class__)
- def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
+ **kwargs):
"""
Initiates the establishment of an AMQP connection. Returns an
instance of proton.Connection.
@@ -748,10 +810,14 @@ class Container(Reactor):
connector.max_frame_size = kwargs.get('max_frame_size')
conn._overrides = connector
- if url: connector.address = Urls([url])
- elif urls: connector.address = Urls(urls)
- elif address: connector.address = address
- else: raise ValueError("One of url, urls or address required")
+ if url:
+ connector.address = Urls([url])
+ elif urls:
+ connector.address = Urls(urls)
+ elif address:
+ connector.address = address
+ else:
+ raise ValueError("One of url, urls or address required")
if heartbeat:
connector.heartbeat = heartbeat
if reconnect:
@@ -761,15 +827,19 @@ class Container(Reactor):
# use container's default client domain if none specified. This is
# only necessary of the URL specifies the "amqps:" scheme
connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
- conn._session_policy = SessionPerConnection() #todo: make configurable
+ conn._session_policy = SessionPerConnection() # todo: make configurable
conn.open()
return conn
def _get_id(self, container, remote, local):
- if local and remote: "%s-%s-%s" % (container, remote, local)
- elif local: return "%s-%s" % (container, local)
- elif remote: return "%s-%s" % (container, remote)
- else: return "%s-%s" % (container, str(generate_uuid()))
+ if local and remote:
+ "%s-%s-%s" % (container, remote, local)
+ elif local:
+ return "%s-%s" % (container, local)
+ elif remote:
+ return "%s-%s" % (container, remote)
+ else:
+ return "%s-%s" % (container, str(generate_uuid()))
def _get_session(self, context):
if isinstance(context, Url):
@@ -806,7 +876,7 @@ class Container(Reactor):
Various LinkOptions can be specified to further control the
attachment.
"""
- if isinstance(context, _compat.string_types):
+ if isstring(context):
context = Url(context)
if isinstance(context, Url) and not target:
target = context.path
@@ -847,7 +917,7 @@ class Container(Reactor):
Various LinkOptions can be specified to further control the
attachment.
"""
- if isinstance(context, _compat.string_types):
+ if isstring(context):
context = Url(context)
if isinstance(context, Url) and not source:
source = context.path
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/python/proton/utils.py b/python/proton/utils.py
index 1d052d0..c6f8cb4 100644
--- a/python/proton/utils.py
+++ b/python/proton/utils.py
@@ -38,7 +38,8 @@ class BlockingLink(object):
self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
timeout=timeout,
msg="Opening link %s" % self.link.name)
- except Timeout as e: pass
+ except Timeout as e:
+ pass
self._checkClosed()
def _checkClosed(self):
@@ -53,31 +54,37 @@ class BlockingLink(object):
msg="Closing link %s" % self.link.name)
# Access to other link attributes.
- def __getattr__(self, name): return getattr(self.link, name)
+ def __getattr__(self, name):
+ return getattr(self.link, name)
+
class SendException(ProtonException):
"""
Exception used to indicate an exceptional state/condition on a send request
"""
+
def __init__(self, state):
self.state = state
+
def _is_settled(delivery):
return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
+
class BlockingSender(BlockingLink):
def __init__(self, connection, sender):
super(BlockingSender, self).__init__(connection, sender)
if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
- #this may be followed by a detach, which may contain an error condition, so wait a little...
+ # this may be followed by a detach, which may contain an error condition, so wait a little...
self._waitForClose()
- #...but close ourselves if peer does not
+ # ...but close ourselves if peer does not
self.link.close()
raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
def send(self, msg, timeout=False, error_states=None):
delivery = self.link.send(msg)
- self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout)
+ self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
+ timeout=timeout)
if delivery.link.snd_settle_mode != Link.SND_SETTLED:
delivery.settle()
bad = error_states
@@ -87,6 +94,7 @@ class BlockingSender(BlockingLink):
raise SendException(delivery.remote_state)
return delivery
+
class Fetcher(MessagingHandler):
def __init__(self, connection, prefetch):
super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
@@ -96,7 +104,7 @@ class Fetcher(MessagingHandler):
def on_message(self, event):
self.incoming.append((event.message, event.delivery))
- self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+ self.connection.container.yield_() # Wake up the wait() loop to handle the message.
def on_link_error(self, event):
if event.link.state & Endpoint.LOCAL_ACTIVE:
@@ -129,9 +137,9 @@ class BlockingReceiver(BlockingLink):
def __init__(self, connection, receiver, fetcher, credit=1):
super(BlockingReceiver, self).__init__(connection, receiver)
if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
- #this may be followed by a detach, which may contain an error condition, so wait a little...
+ # this may be followed by a detach, which may contain an error condition, so wait a little...
self._waitForClose()
- #...but close ourselves if peer does not
+ # ...but close ourselves if peer does not
self.link.close()
raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
if credit: receiver.flow(credit)
@@ -151,7 +159,8 @@ class BlockingReceiver(BlockingLink):
raise Exception("Can't call receive on this receiver as a handler was provided")
if not self.link.credit:
self.link.flow(1)
- self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout)
+ self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
+ timeout=timeout)
return self.fetcher.pop()
def accept(self):
@@ -210,6 +219,7 @@ class BlockingConnection(Handler):
object operations are enclosed in a try block and that close() is
always executed on exit.
"""
+
def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
self.disconnected = False
self.timeout = timeout or 60
@@ -221,7 +231,8 @@ class BlockingConnection(Handler):
self.closing = False
failed = True
try:
- self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
+ self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
+ heartbeat=heartbeat, **kwargs)
self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
msg="Opening connection")
failed = False
@@ -230,7 +241,8 @@ class BlockingConnection(Handler):
self.close()
def create_sender(self, address, handler=None, name=None, options=None):
- return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
+ return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
+ options=options))
def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
prefetch = credit
@@ -241,7 +253,9 @@ class BlockingConnection(Handler):
else:
fetcher = Fetcher(self, credit)
return BlockingReceiver(
- self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
+ self,
+ self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
+ options=options), fetcher, credit=prefetch)
def close(self):
# TODO: provide stronger interrupt protection on cleanup. See PEP 419
@@ -259,8 +273,8 @@ class BlockingConnection(Handler):
# Nothing left to block on. Allow reactor to clean up.
self.run()
self.conn = None
- self.container.global_handler = None # break circular ref: container to cadapter.on_error
- pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
+ self.container.global_handler = None # break circular ref: container to cadapter.on_error
+ pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
self.container = None
def _is_closed(self):
@@ -294,7 +308,7 @@ class BlockingConnection(Handler):
self.container.timeout = container_timeout
if self.disconnected or self._is_closed():
self.container.stop()
- self.conn.handler = None # break cyclical reference
+ self.conn.handler = None # break cyclical reference
if self.disconnected and not self._is_closed():
raise ConnectionException(
"Connection %s disconnected: %s" % (self.url, self.disconnected))
@@ -320,6 +334,7 @@ class BlockingConnection(Handler):
def on_transport_closed(self, event):
self.disconnected = event.transport.condition or "unknown"
+
class AtomicCount(object):
def __init__(self, start=0, step=1):
"""Thread-safe atomic counter. Start at start, increment by step."""
@@ -334,6 +349,7 @@ class AtomicCount(object):
self.lock.release()
return result
+
class SyncRequestResponse(IncomingMessageHandler):
"""
Implementation of the synchronous request-response (aka RPC) pattern.
@@ -374,12 +390,14 @@ class SyncRequestResponse(IncomingMessageHandler):
request.reply_to = self.reply_to
request.correlation_id = correlation_id = str(self.correlation_id.next())
self.sender.send(request)
+
def wakeup():
return self.response and (self.response.correlation_id == correlation_id)
+
self.connection.wait(wakeup, msg="Waiting for response")
response = self.response
- self.response = None # Ready for next response.
- self.receiver.flow(1) # Set up credit for the next response.
+ self.response = None # Ready for next response.
+ self.receiver.flow(1) # Set up credit for the next response.
return response
@property
@@ -390,4 +408,4 @@ class SyncRequestResponse(IncomingMessageHandler):
def on_message(self, event):
"""Called when we receive a message for our receiver."""
self.response = event.message
- self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+ self.connection.container.yield_() # Wake up the wait() loop to handle the message.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org