You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2018/05/25 22:21:51 UTC
[5/6] qpid-proton git commit: PROTON-1850: Split up proton
__init__.py into multiple files - Reformatted python source to (mostly) PEP-8
standards - Control what gets exported from __init__ by restricting what it
imports - Move most of the reactor implem
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/python/proton/__init__.py b/python/proton/__init__.py
index 6ee0d68..be8e247 100644
--- a/python/proton/__init__.py
+++ b/python/proton/__init__.py
@@ -30,54 +30,84 @@ The proton APIs consist of the following classes:
"""
from __future__ import absolute_import
-from cproton import *
-from .wrapper import Wrapper
-from . import _compat
-
import logging
-import socket
-import sys
-import threading
-import uuid
-import weakref
-
-# This private NullHandler is required for Python 2.6,
-# when we no longer support 2.6 this replace NullHandler class definition and assignment with:
-# handler = logging.NullHandler()
-class NullHandler(logging.Handler):
- def handle(self, record):
- pass
-
- def emit(self, record):
- pass
-
- def createLock(self):
- self.lock = None
-
-handler = NullHandler()
-
-log = logging.getLogger("proton")
-log.addHandler(handler)
-def generate_uuid():
- return uuid.uuid4()
+from cproton import PN_VERSION_MAJOR, PN_VERSION_MINOR, PN_VERSION_POINT
-#
-# Hacks to provide Python2 <---> Python3 compatibility
-#
-# The results are
-# | |long|unicode|
-# |python2|long|unicode|
-# |python3| int| str|
-try:
- long()
-except NameError:
- long = int
-try:
- unicode()
-except NameError:
- unicode = str
+from ._condition import Condition
+from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \
+ byte, short, int32, float32, decimal32, decimal64, decimal128
+from ._delivery import Delivery, Disposition
+from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus
+from ._events import Collector, Event, EventType, Handler
+from ._exceptions import ProtonException, MessageException, DataException, TransportException, \
+ SSLException, SSLUnavailable, ConnectionException, SessionException, LinkException, Timeout, Interrupt
+from ._message import Message, ABORTED, ACCEPTED, PENDING, REJECTED, RELEASED, MODIFIED, SETTLED
+from ._transport import Transport, SASL, SSL, SSLDomain, SSLSessionDetails
+from ._url import Url
+__all__ = [
+ "API_LANGUAGE",
+ "IMPLEMENTATION_LANGUAGE",
+ "ABORTED",
+ "ACCEPTED",
+ "PENDING",
+ "REJECTED",
+ "RELEASED",
+ "MODIFIED",
+ "SETTLED",
+ "UNDESCRIBED",
+ "Array",
+ "Collector",
+ "Condition",
+ "Connection",
+ "Data",
+ "DataException",
+ "Delivery",
+ "Disposition",
+ "Described",
+ "Endpoint",
+ "Event",
+ "EventType",
+ "Handler",
+ "Link",
+ "LinkException",
+ "Message",
+ "MessageException",
+ "ProtonException",
+ "VERSION_MAJOR",
+ "VERSION_MINOR",
+ "Receiver",
+ "SASL",
+ "Sender",
+ "Session",
+ "SessionException",
+ "SSL",
+ "SSLDomain",
+ "SSLSessionDetails",
+ "SSLUnavailable",
+ "SSLException",
+ "Terminus",
+ "Timeout",
+ "Interrupt",
+ "Transport",
+ "TransportException",
+ "Url",
+ "char",
+ "symbol",
+ "timestamp",
+ "ulong",
+ "byte",
+ "short",
+ "int32",
+ "ubyte",
+ "ushort",
+ "uint",
+ "float32",
+ "decimal32",
+ "decimal64",
+ "decimal128"
+]
VERSION_MAJOR = PN_VERSION_MAJOR
VERSION_MINOR = PN_VERSION_MINOR
@@ -86,3603 +116,27 @@ VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
API_LANGUAGE = "C"
IMPLEMENTATION_LANGUAGE = "C"
-class Constant(object):
-
- def __init__(self, name):
- self.name = name
-
- def __repr__(self):
- return self.name
-
-class ProtonException(Exception):
- """
- The root of the proton exception hierarchy. All proton exception
- classes derive from this exception.
- """
- pass
-
-class Timeout(ProtonException):
- """
- A timeout exception indicates that a blocking operation has timed
- out.
- """
- pass
-
-class Interrupt(ProtonException):
- """
- An interrupt exception indicates that a blocking operation was interrupted.
- """
- pass
-
-class MessageException(ProtonException):
- """
- The MessageException class is the root of the message exception
- hierarchy. All exceptions generated by the Message class derive from
- this exception.
- """
- pass
-
-EXCEPTIONS = {
- PN_TIMEOUT: Timeout,
- PN_INTR: Interrupt
- }
-
-PENDING = Constant("PENDING")
-ACCEPTED = Constant("ACCEPTED")
-REJECTED = Constant("REJECTED")
-RELEASED = Constant("RELEASED")
-MODIFIED = Constant("MODIFIED")
-ABORTED = Constant("ABORTED")
-SETTLED = Constant("SETTLED")
-
-STATUSES = {
- PN_STATUS_ABORTED: ABORTED,
- PN_STATUS_ACCEPTED: ACCEPTED,
- PN_STATUS_REJECTED: REJECTED,
- PN_STATUS_RELEASED: RELEASED,
- PN_STATUS_MODIFIED: MODIFIED,
- PN_STATUS_PENDING: PENDING,
- PN_STATUS_SETTLED: SETTLED,
- PN_STATUS_UNKNOWN: None
- }
-
-class Message(object):
- """The L{Message} class is a mutable holder of message content.
-
- @ivar instructions: delivery instructions for the message
- @type instructions: dict
- @ivar annotations: infrastructure defined message annotations
- @type annotations: dict
- @ivar properties: application defined message properties
- @type properties: dict
- @ivar body: message body
- @type body: bytes | unicode | dict | list | int | long | float | UUID
- """
-
- DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
-
- def __init__(self, body=None, **kwargs):
- """
- @param kwargs: Message property name/value pairs to initialise the Message
- """
- self._msg = pn_message()
- self._id = Data(pn_message_id(self._msg))
- self._correlation_id = Data(pn_message_correlation_id(self._msg))
- self.instructions = None
- self.annotations = None
- self.properties = None
- self.body = body
- for k,v in _compat.iteritems(kwargs):
- getattr(self, k) # Raise exception if it's not a valid attribute.
- setattr(self, k, v)
-
- def __del__(self):
- if hasattr(self, "_msg"):
- pn_message_free(self._msg)
- del self._msg
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, MessageException)
- raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
- else:
- return err
-
- def _check_property_keys(self):
- for k in self.properties.keys():
- if isinstance(k, unicode):
- # py2 unicode, py3 str (via hack definition)
- continue
- # If key is binary then change to string
- elif isinstance(k, str):
- # py2 str
- self.properties[k.encode('utf-8')] = self.properties.pop(k)
- else:
- raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
-
- def _pre_encode(self):
- inst = Data(pn_message_instructions(self._msg))
- ann = Data(pn_message_annotations(self._msg))
- props = Data(pn_message_properties(self._msg))
- body = Data(pn_message_body(self._msg))
-
- inst.clear()
- if self.instructions is not None:
- inst.put_object(self.instructions)
- ann.clear()
- if self.annotations is not None:
- ann.put_object(self.annotations)
- props.clear()
- if self.properties is not None:
- self._check_property_keys()
- props.put_object(self.properties)
- body.clear()
- if self.body is not None:
- body.put_object(self.body)
-
- def _post_decode(self):
- inst = Data(pn_message_instructions(self._msg))
- ann = Data(pn_message_annotations(self._msg))
- props = Data(pn_message_properties(self._msg))
- body = Data(pn_message_body(self._msg))
-
- if inst.next():
- self.instructions = inst.get_object()
- else:
- self.instructions = None
- if ann.next():
- self.annotations = ann.get_object()
- else:
- self.annotations = None
- if props.next():
- self.properties = props.get_object()
- else:
- self.properties = None
- if body.next():
- self.body = body.get_object()
- else:
- self.body = None
-
- def clear(self):
- """
- Clears the contents of the L{Message}. All fields will be reset to
- their default values.
- """
- pn_message_clear(self._msg)
- self.instructions = None
- self.annotations = None
- self.properties = None
- self.body = None
-
- def _is_inferred(self):
- return pn_message_is_inferred(self._msg)
-
- def _set_inferred(self, value):
- self._check(pn_message_set_inferred(self._msg, bool(value)))
-
- inferred = property(_is_inferred, _set_inferred, doc="""
-The inferred flag for a message indicates how the message content
-is encoded into AMQP sections. If inferred is true then binary and
-list values in the body of the message will be encoded as AMQP DATA
-and AMQP SEQUENCE sections, respectively. If inferred is false,
-then all values in the body of the message will be encoded as AMQP
-VALUE sections regardless of their type.
-""")
-
- def _is_durable(self):
- return pn_message_is_durable(self._msg)
-
- def _set_durable(self, value):
- self._check(pn_message_set_durable(self._msg, bool(value)))
-
- durable = property(_is_durable, _set_durable,
- doc="""
-The durable property indicates that the message should be held durably
-by any intermediaries taking responsibility for the message.
-""")
-
- def _get_priority(self):
- return pn_message_get_priority(self._msg)
-
- def _set_priority(self, value):
- self._check(pn_message_set_priority(self._msg, value))
-
- priority = property(_get_priority, _set_priority,
- doc="""
-The priority of the message.
-""")
-
- def _get_ttl(self):
- return millis2secs(pn_message_get_ttl(self._msg))
-
- def _set_ttl(self, value):
- self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
-
- ttl = property(_get_ttl, _set_ttl,
- doc="""
-The time to live of the message measured in seconds. Expired messages
-may be dropped.
-""")
-
- def _is_first_acquirer(self):
- return pn_message_is_first_acquirer(self._msg)
-
- def _set_first_acquirer(self, value):
- self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
-
- first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
- doc="""
-True iff the recipient is the first to acquire the message.
-""")
-
- def _get_delivery_count(self):
- return pn_message_get_delivery_count(self._msg)
-
- def _set_delivery_count(self, value):
- self._check(pn_message_set_delivery_count(self._msg, value))
-
- delivery_count = property(_get_delivery_count, _set_delivery_count,
- doc="""
-The number of delivery attempts made for this message.
-""")
-
-
- def _get_id(self):
- return self._id.get_object()
- def _set_id(self, value):
- if type(value) in (int, long):
- value = ulong(value)
- self._id.rewind()
- self._id.put_object(value)
- id = property(_get_id, _set_id,
- doc="""
-The id of the message.
-""")
-
- def _get_user_id(self):
- return pn_message_get_user_id(self._msg)
-
- def _set_user_id(self, value):
- self._check(pn_message_set_user_id(self._msg, value))
-
- user_id = property(_get_user_id, _set_user_id,
- doc="""
-The user id of the message creator.
-""")
-
- def _get_address(self):
- return utf82unicode(pn_message_get_address(self._msg))
-
- def _set_address(self, value):
- self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
-
- address = property(_get_address, _set_address,
- doc="""
-The address of the message.
-""")
-
- def _get_subject(self):
- return utf82unicode(pn_message_get_subject(self._msg))
-
- def _set_subject(self, value):
- self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
-
- subject = property(_get_subject, _set_subject,
- doc="""
-The subject of the message.
-""")
-
- def _get_reply_to(self):
- return utf82unicode(pn_message_get_reply_to(self._msg))
-
- def _set_reply_to(self, value):
- self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
-
- reply_to = property(_get_reply_to, _set_reply_to,
- doc="""
-The reply-to address for the message.
-""")
-
- def _get_correlation_id(self):
- return self._correlation_id.get_object()
- def _set_correlation_id(self, value):
- if type(value) in (int, long):
- value = ulong(value)
- self._correlation_id.rewind()
- self._correlation_id.put_object(value)
-
- correlation_id = property(_get_correlation_id, _set_correlation_id,
- doc="""
-The correlation-id for the message.
-""")
-
- def _get_content_type(self):
- return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
-
- def _set_content_type(self, value):
- self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
-
- content_type = property(_get_content_type, _set_content_type,
- doc="""
-The content-type of the message.
-""")
-
- def _get_content_encoding(self):
- return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
-
- def _set_content_encoding(self, value):
- self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
-
- content_encoding = property(_get_content_encoding, _set_content_encoding,
- doc="""
-The content-encoding of the message.
-""")
-
- def _get_expiry_time(self):
- return millis2secs(pn_message_get_expiry_time(self._msg))
-
- def _set_expiry_time(self, value):
- self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
-
- expiry_time = property(_get_expiry_time, _set_expiry_time,
- doc="""
-The expiry time of the message.
-""")
-
- def _get_creation_time(self):
- return millis2secs(pn_message_get_creation_time(self._msg))
-
- def _set_creation_time(self, value):
- self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
-
- creation_time = property(_get_creation_time, _set_creation_time,
- doc="""
-The creation time of the message.
-""")
-
- def _get_group_id(self):
- return utf82unicode(pn_message_get_group_id(self._msg))
-
- def _set_group_id(self, value):
- self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
-
- group_id = property(_get_group_id, _set_group_id,
- doc="""
-The group id of the message.
-""")
-
- def _get_group_sequence(self):
- return pn_message_get_group_sequence(self._msg)
-
- def _set_group_sequence(self, value):
- self._check(pn_message_set_group_sequence(self._msg, value))
-
- group_sequence = property(_get_group_sequence, _set_group_sequence,
- doc="""
-The sequence of the message within its group.
-""")
-
- def _get_reply_to_group_id(self):
- return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
-
- def _set_reply_to_group_id(self, value):
- self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
-
- reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
- doc="""
-The group-id for any replies.
-""")
-
- def encode(self):
- self._pre_encode()
- sz = 16
- while True:
- err, data = pn_message_encode(self._msg, sz)
- if err == PN_OVERFLOW:
- sz *= 2
- continue
- else:
- self._check(err)
- return data
-
- def decode(self, data):
- self._check(pn_message_decode(self._msg, data))
- self._post_decode()
-
- def send(self, sender, tag=None):
- dlv = sender.delivery(tag or sender.delivery_tag())
- encoded = self.encode()
- sender.stream(encoded)
- sender.advance()
- if sender.snd_settle_mode == Link.SND_SETTLED:
- dlv.settle()
- return dlv
-
- def recv(self, link):
- """
- Receives and decodes the message content for the current delivery
- from the link. Upon success it will return the current delivery
- for the link. If there is no current delivery, or if the current
- delivery is incomplete, or if the link is not a receiver, it will
- return None.
-
- @type link: Link
- @param link: the link to receive a message from
- @return the delivery associated with the decoded message (or None)
-
- """
- if link.is_sender: return None
- dlv = link.current
- if not dlv or dlv.partial: return None
- dlv.encoded = link.recv(dlv.pending)
- link.advance()
- # the sender has already forgotten about the delivery, so we might
- # as well too
- if link.remote_snd_settle_mode == Link.SND_SETTLED:
- dlv.settle()
- self.decode(dlv.encoded)
- return dlv
-
- def __repr2__(self):
- props = []
- for attr in ("inferred", "address", "reply_to", "durable", "ttl",
- "priority", "first_acquirer", "delivery_count", "id",
- "correlation_id", "user_id", "group_id", "group_sequence",
- "reply_to_group_id", "instructions", "annotations",
- "properties", "body"):
- value = getattr(self, attr)
- if value: props.append("%s=%r" % (attr, value))
- return "Message(%s)" % ", ".join(props)
-
- def __repr__(self):
- tmp = pn_string(None)
- err = pn_inspect(self._msg, tmp)
- result = pn_string_get(tmp)
- pn_free(tmp)
- self._check(err)
- return result
-
-_DEFAULT = object()
-
-class Selectable(Wrapper):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Selectable(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_selectable_attachments)
-
- def _init(self):
- pass
-
- def fileno(self, fd = _DEFAULT):
- if fd is _DEFAULT:
- return pn_selectable_get_fd(self._impl)
- elif fd is None:
- pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
- else:
- pn_selectable_set_fd(self._impl, fd)
-
- def _is_reading(self):
- return pn_selectable_is_reading(self._impl)
-
- def _set_reading(self, val):
- pn_selectable_set_reading(self._impl, bool(val))
-
- reading = property(_is_reading, _set_reading)
-
- def _is_writing(self):
- return pn_selectable_is_writing(self._impl)
-
- def _set_writing(self, val):
- pn_selectable_set_writing(self._impl, bool(val))
-
- writing = property(_is_writing, _set_writing)
-
- def _get_deadline(self):
- tstamp = pn_selectable_get_deadline(self._impl)
- if tstamp:
- return millis2secs(tstamp)
- else:
- return None
-
- def _set_deadline(self, deadline):
- pn_selectable_set_deadline(self._impl, secs2millis(deadline))
-
- deadline = property(_get_deadline, _set_deadline)
-
- def readable(self):
- pn_selectable_readable(self._impl)
-
- def writable(self):
- pn_selectable_writable(self._impl)
-
- def expired(self):
- pn_selectable_expired(self._impl)
-
- def _is_registered(self):
- return pn_selectable_is_registered(self._impl)
-
- def _set_registered(self, registered):
- pn_selectable_set_registered(self._impl, registered)
-
- registered = property(_is_registered, _set_registered,
- doc="""
-The registered property may be get/set by an I/O polling system to
-indicate whether the fd has been registered or not.
-""")
-
- @property
- def is_terminal(self):
- return pn_selectable_is_terminal(self._impl)
-
- def terminate(self):
- pn_selectable_terminate(self._impl)
-
- def release(self):
- pn_selectable_release(self._impl)
-
-class DataException(ProtonException):
- """
- The DataException class is the root of the Data exception hierarchy.
- All exceptions raised by the Data class extend this exception.
- """
- pass
-
-class UnmappedType:
-
- def __init__(self, msg):
- self.msg = msg
-
- def __repr__(self):
- return "UnmappedType(%s)" % self.msg
-class ulong(long):
-
- def __repr__(self):
- return "ulong(%s)" % long.__repr__(self)
-
-class timestamp(long):
-
- def __repr__(self):
- return "timestamp(%s)" % long.__repr__(self)
-
-class symbol(unicode):
-
- def __repr__(self):
- return "symbol(%s)" % unicode.__repr__(self)
-
-class char(unicode):
-
- def __repr__(self):
- return "char(%s)" % unicode.__repr__(self)
-
-class byte(int):
-
- def __repr__(self):
- return "byte(%s)" % int.__repr__(self)
-
-class short(int):
-
- def __repr__(self):
- return "short(%s)" % int.__repr__(self)
-
-class int32(int):
-
- def __repr__(self):
- return "int32(%s)" % int.__repr__(self)
-
-class ubyte(int):
-
- def __repr__(self):
- return "ubyte(%s)" % int.__repr__(self)
-
-class ushort(int):
-
- def __repr__(self):
- return "ushort(%s)" % int.__repr__(self)
-
-class uint(long):
-
- def __repr__(self):
- return "uint(%s)" % long.__repr__(self)
-
-class float32(float):
-
- def __repr__(self):
- return "float32(%s)" % float.__repr__(self)
-
-class decimal32(int):
-
- def __repr__(self):
- return "decimal32(%s)" % int.__repr__(self)
-
-class decimal64(long):
-
- def __repr__(self):
- return "decimal64(%s)" % long.__repr__(self)
-
-class decimal128(bytes):
-
- def __repr__(self):
- return "decimal128(%s)" % bytes.__repr__(self)
-
-class Described(object):
-
- def __init__(self, descriptor, value):
- self.descriptor = descriptor
- self.value = value
-
- def __repr__(self):
- return "Described(%r, %r)" % (self.descriptor, self.value)
-
- def __eq__(self, o):
- if isinstance(o, Described):
- return self.descriptor == o.descriptor and self.value == o.value
- else:
- return False
-
-UNDESCRIBED = Constant("UNDESCRIBED")
-
-class Array(object):
-
- def __init__(self, descriptor, type, *elements):
- self.descriptor = descriptor
- self.type = type
- self.elements = elements
-
- def __iter__(self):
- return iter(self.elements)
-
- def __repr__(self):
- if self.elements:
- els = ", %s" % (", ".join(map(repr, self.elements)))
- else:
- els = ""
- return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
-
- def __eq__(self, o):
- if isinstance(o, Array):
- return self.descriptor == o.descriptor and \
- self.type == o.type and self.elements == o.elements
- else:
- return False
-
-class Data:
- """
- The L{Data} class provides an interface for decoding, extracting,
- creating, and encoding arbitrary AMQP data. A L{Data} object
- contains a tree of AMQP values. Leaf nodes in this tree correspond
- to scalars in the AMQP type system such as L{ints<INT>} or
- L{strings<STRING>}. Non-leaf nodes in this tree correspond to
- compound values in the AMQP type system such as L{lists<LIST>},
- L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
- The root node of the tree is the L{Data} object itself and can have
- an arbitrary number of children.
-
- A L{Data} object maintains the notion of the current sibling node
- and a current parent node. Siblings are ordered within their parent.
- Values are accessed and/or added by using the L{next}, L{prev},
- L{enter}, and L{exit} methods to navigate to the desired location in
- the tree and using the supplied variety of put_*/get_* methods to
- access or add a value of the desired type.
-
- The put_* methods will always add a value I{after} the current node
- in the tree. If the current node has a next sibling the put_* method
- will overwrite the value on this node. If there is no current node
- or the current node has no next sibling then one will be added. The
- put_* methods always set the added/modified node to the current
- node. The get_* methods read the value of the current node and do
- not change which node is current.
-
- The following types of scalar values are supported:
-
- - L{NULL}
- - L{BOOL}
- - L{UBYTE}
- - L{USHORT}
- - L{SHORT}
- - L{UINT}
- - L{INT}
- - L{ULONG}
- - L{LONG}
- - L{FLOAT}
- - L{DOUBLE}
- - L{BINARY}
- - L{STRING}
- - L{SYMBOL}
-
- The following types of compound values are supported:
-
- - L{DESCRIBED}
- - L{ARRAY}
- - L{LIST}
- - L{MAP}
- """
-
- NULL = PN_NULL; "A null value."
- BOOL = PN_BOOL; "A boolean value."
- UBYTE = PN_UBYTE; "An unsigned byte value."
- BYTE = PN_BYTE; "A signed byte value."
- USHORT = PN_USHORT; "An unsigned short value."
- SHORT = PN_SHORT; "A short value."
- UINT = PN_UINT; "An unsigned int value."
- INT = PN_INT; "A signed int value."
- CHAR = PN_CHAR; "A character value."
- ULONG = PN_ULONG; "An unsigned long value."
- LONG = PN_LONG; "A signed long value."
- TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
- FLOAT = PN_FLOAT; "A float value."
- DOUBLE = PN_DOUBLE; "A double value."
- DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
- DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
- DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
- UUID = PN_UUID; "A UUID value."
- BINARY = PN_BINARY; "A binary string."
- STRING = PN_STRING; "A unicode string."
- SYMBOL = PN_SYMBOL; "A symbolic string."
- DESCRIBED = PN_DESCRIBED; "A described value."
- ARRAY = PN_ARRAY; "An array value."
- LIST = PN_LIST; "A list value."
- MAP = PN_MAP; "A map value."
-
- type_names = {
- NULL: "null",
- BOOL: "bool",
- BYTE: "byte",
- UBYTE: "ubyte",
- SHORT: "short",
- USHORT: "ushort",
- INT: "int",
- UINT: "uint",
- CHAR: "char",
- LONG: "long",
- ULONG: "ulong",
- TIMESTAMP: "timestamp",
- FLOAT: "float",
- DOUBLE: "double",
- DECIMAL32: "decimal32",
- DECIMAL64: "decimal64",
- DECIMAL128: "decimal128",
- UUID: "uuid",
- BINARY: "binary",
- STRING: "string",
- SYMBOL: "symbol",
- DESCRIBED: "described",
- ARRAY: "array",
- LIST: "list",
- MAP: "map"
- }
-
- @classmethod
- def type_name(type): return Data.type_names[type]
-
- def __init__(self, capacity=16):
- if type(capacity) in (int, long):
- self._data = pn_data(capacity)
- self._free = True
- else:
- self._data = capacity
- self._free = False
-
- def __del__(self):
- if self._free and hasattr(self, "_data"):
- pn_data_free(self._data)
- del self._data
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, DataException)
- raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
- else:
- return err
-
- def clear(self):
- """
- Clears the data object.
- """
- pn_data_clear(self._data)
-
- def rewind(self):
- """
- Clears current node and sets the parent to the root node. Clearing the
- current node sets it _before_ the first node, calling next() will advance to
- the first node.
- """
- assert self._data is not None
- pn_data_rewind(self._data)
-
- def next(self):
- """
- Advances the current node to its next sibling and returns its
- type. If there is no next sibling the current node remains
- unchanged and None is returned.
- """
- found = pn_data_next(self._data)
- if found:
- return self.type()
- else:
- return None
-
- def prev(self):
- """
- Advances the current node to its previous sibling and returns its
- type. If there is no previous sibling the current node remains
- unchanged and None is returned.
- """
- found = pn_data_prev(self._data)
- if found:
- return self.type()
- else:
- return None
-
- def enter(self):
- """
- Sets the parent node to the current node and clears the current node.
- Clearing the current node sets it _before_ the first child,
- call next() advances to the first child.
- """
- return pn_data_enter(self._data)
-
- def exit(self):
- """
- Sets the current node to the parent node and the parent node to
- its own parent.
- """
- return pn_data_exit(self._data)
-
- def lookup(self, name):
- return pn_data_lookup(self._data, name)
-
- def narrow(self):
- pn_data_narrow(self._data)
-
- def widen(self):
- pn_data_widen(self._data)
-
- def type(self):
- """
- Returns the type of the current node.
- """
- dtype = pn_data_type(self._data)
- if dtype == -1:
- return None
- else:
- return dtype
-
- def encoded_size(self):
- """
- Returns the size in bytes needed to encode the data in AMQP format.
- """
- return pn_data_encoded_size(self._data)
-
- def encode(self):
- """
- Returns a representation of the data encoded in AMQP format.
- """
- size = 1024
- while True:
- cd, enc = pn_data_encode(self._data, size)
- if cd == PN_OVERFLOW:
- size *= 2
- elif cd >= 0:
- return enc
- else:
- self._check(cd)
-
- def decode(self, encoded):
- """
- Decodes the first value from supplied AMQP data and returns the
- number of bytes consumed.
-
- @type encoded: binary
- @param encoded: AMQP encoded binary data
- """
- return self._check(pn_data_decode(self._data, encoded))
-
- def put_list(self):
- """
- Puts a list value. Elements may be filled by entering the list
- node and putting element values.
-
- >>> data = Data()
- >>> data.put_list()
- >>> data.enter()
- >>> data.put_int(1)
- >>> data.put_int(2)
- >>> data.put_int(3)
- >>> data.exit()
- """
- self._check(pn_data_put_list(self._data))
-
- def put_map(self):
- """
- Puts a map value. Elements may be filled by entering the map node
- and putting alternating key value pairs.
-
- >>> data = Data()
- >>> data.put_map()
- >>> data.enter()
- >>> data.put_string("key")
- >>> data.put_string("value")
- >>> data.exit()
- """
- self._check(pn_data_put_map(self._data))
-
- def put_array(self, described, element_type):
- """
- Puts an array value. Elements may be filled by entering the array
- node and putting the element values. The values must all be of the
- specified array element type. If an array is described then the
- first child value of the array is the descriptor and may be of any
- type.
-
- >>> data = Data()
- >>>
- >>> data.put_array(False, Data.INT)
- >>> data.enter()
- >>> data.put_int(1)
- >>> data.put_int(2)
- >>> data.put_int(3)
- >>> data.exit()
- >>>
- >>> data.put_array(True, Data.DOUBLE)
- >>> data.enter()
- >>> data.put_symbol("array-descriptor")
- >>> data.put_double(1.1)
- >>> data.put_double(1.2)
- >>> data.put_double(1.3)
- >>> data.exit()
-
- @type described: bool
- @param described: specifies whether the array is described
- @type element_type: int
- @param element_type: the type of the array elements
- """
- self._check(pn_data_put_array(self._data, described, element_type))
-
- def put_described(self):
- """
- Puts a described value. A described node has two children, the
- descriptor and the value. These are specified by entering the node
- and putting the desired values.
-
- >>> data = Data()
- >>> data.put_described()
- >>> data.enter()
- >>> data.put_symbol("value-descriptor")
- >>> data.put_string("the value")
- >>> data.exit()
- """
- self._check(pn_data_put_described(self._data))
-
- def put_null(self):
- """
- Puts a null value.
- """
- self._check(pn_data_put_null(self._data))
-
- def put_bool(self, b):
- """
- Puts a boolean value.
-
- @param b: a boolean value
- """
- self._check(pn_data_put_bool(self._data, b))
-
- def put_ubyte(self, ub):
- """
- Puts an unsigned byte value.
-
- @param ub: an integral value
- """
- self._check(pn_data_put_ubyte(self._data, ub))
-
- def put_byte(self, b):
- """
- Puts a signed byte value.
-
- @param b: an integral value
- """
- self._check(pn_data_put_byte(self._data, b))
-
- def put_ushort(self, us):
- """
- Puts an unsigned short value.
-
- @param us: an integral value.
- """
- self._check(pn_data_put_ushort(self._data, us))
-
- def put_short(self, s):
- """
- Puts a signed short value.
-
- @param s: an integral value
- """
- self._check(pn_data_put_short(self._data, s))
-
- def put_uint(self, ui):
- """
- Puts an unsigned int value.
-
- @param ui: an integral value
- """
- self._check(pn_data_put_uint(self._data, ui))
-
- def put_int(self, i):
- """
- Puts a signed int value.
-
- @param i: an integral value
- """
- self._check(pn_data_put_int(self._data, i))
-
- def put_char(self, c):
- """
- Puts a char value.
-
- @param c: a single character
- """
- self._check(pn_data_put_char(self._data, ord(c)))
-
- def put_ulong(self, ul):
- """
- Puts an unsigned long value.
-
- @param ul: an integral value
- """
- self._check(pn_data_put_ulong(self._data, ul))
-
- def put_long(self, l):
- """
- Puts a signed long value.
-
- @param l: an integral value
- """
- self._check(pn_data_put_long(self._data, l))
-
- def put_timestamp(self, t):
- """
- Puts a timestamp value.
-
- @param t: an integral value
- """
- self._check(pn_data_put_timestamp(self._data, t))
-
- def put_float(self, f):
- """
- Puts a float value.
-
- @param f: a floating point value
- """
- self._check(pn_data_put_float(self._data, f))
-
- def put_double(self, d):
- """
- Puts a double value.
-
- @param d: a floating point value.
- """
- self._check(pn_data_put_double(self._data, d))
-
- def put_decimal32(self, d):
- """
- Puts a decimal32 value.
-
- @param d: a decimal32 value
- """
- self._check(pn_data_put_decimal32(self._data, d))
-
- def put_decimal64(self, d):
- """
- Puts a decimal64 value.
-
- @param d: a decimal64 value
- """
- self._check(pn_data_put_decimal64(self._data, d))
-
- def put_decimal128(self, d):
- """
- Puts a decimal128 value.
-
- @param d: a decimal128 value
- """
- self._check(pn_data_put_decimal128(self._data, d))
-
- def put_uuid(self, u):
- """
- Puts a UUID value.
-
- @param u: a uuid value
- """
- self._check(pn_data_put_uuid(self._data, u.bytes))
-
- def put_binary(self, b):
- """
- Puts a binary value.
-
- @type b: binary
- @param b: a binary value
- """
- self._check(pn_data_put_binary(self._data, b))
-
- def put_memoryview(self, mv):
- """Put a python memoryview object as an AMQP binary value"""
- self.put_binary(mv.tobytes())
-
- def put_buffer(self, buff):
- """Put a python buffer object as an AMQP binary value"""
- self.put_binary(bytes(buff))
-
- def put_string(self, s):
- """
- Puts a unicode value.
-
- @type s: unicode
- @param s: a unicode value
- """
- self._check(pn_data_put_string(self._data, s.encode("utf8")))
-
- def put_symbol(self, s):
- """
- Puts a symbolic value.
-
- @type s: string
- @param s: the symbol name
- """
- self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
-
- def get_list(self):
- """
- If the current node is a list, return the number of elements,
- otherwise return zero. List elements can be accessed by entering
- the list.
-
- >>> count = data.get_list()
- >>> data.enter()
- >>> for i in range(count):
- ... type = data.next()
- ... if type == Data.STRING:
- ... print data.get_string()
- ... elif type == ...:
- ... ...
- >>> data.exit()
- """
- return pn_data_get_list(self._data)
-
- def get_map(self):
- """
- If the current node is a map, return the number of child elements,
- otherwise return zero. Key value pairs can be accessed by entering
- the map.
-
- >>> count = data.get_map()
- >>> data.enter()
- >>> for i in range(count/2):
- ... type = data.next()
- ... if type == Data.STRING:
- ... print data.get_string()
- ... elif type == ...:
- ... ...
- >>> data.exit()
- """
- return pn_data_get_map(self._data)
-
- def get_array(self):
- """
- If the current node is an array, return a tuple of the element
- count, a boolean indicating whether the array is described, and
- the type of each element, otherwise return (0, False, None). Array
- data can be accessed by entering the array.
-
- >>> # read an array of strings with a symbolic descriptor
- >>> count, described, type = data.get_array()
- >>> data.enter()
- >>> data.next()
- >>> print "Descriptor:", data.get_symbol()
- >>> for i in range(count):
- ... data.next()
- ... print "Element:", data.get_string()
- >>> data.exit()
- """
- count = pn_data_get_array(self._data)
- described = pn_data_is_array_described(self._data)
- type = pn_data_get_array_type(self._data)
- if type == -1:
- type = None
- return count, described, type
+# This private NullHandler is required for Python 2.6,
+# when we no longer support 2.6 replace this NullHandler class definition and assignment with:
+# handler = logging.NullHandler()
+class NullHandler(logging.Handler):
+ def handle(self, record):
+ pass
- def is_described(self):
- """
- Checks if the current node is a described value. The descriptor
- and value may be accessed by entering the described value.
+ def emit(self, record):
+ pass
- >>> # read a symbolically described string
- >>> assert data.is_described() # will error if the current node is not described
- >>> data.enter()
- >>> data.next()
- >>> print data.get_symbol()
- >>> data.next()
- >>> print data.get_string()
- >>> data.exit()
- """
- return pn_data_is_described(self._data)
+ def createLock(self):
+ self.lock = None
- def is_null(self):
- """
- Checks if the current node is a null.
- """
- return pn_data_is_null(self._data)
- def get_bool(self):
- """
- If the current node is a boolean, returns its value, returns False
- otherwise.
- """
- return pn_data_get_bool(self._data)
+handler = NullHandler()
- def get_ubyte(self):
- """
- If the current node is an unsigned byte, returns its value,
- returns 0 otherwise.
- """
- return ubyte(pn_data_get_ubyte(self._data))
+log = logging.getLogger("proton")
+log.addHandler(handler)
- def get_byte(self):
- """
- If the current node is a signed byte, returns its value, returns 0
- otherwise.
- """
- return byte(pn_data_get_byte(self._data))
- def get_ushort(self):
- """
- If the current node is an unsigned short, returns its value,
- returns 0 otherwise.
- """
- return ushort(pn_data_get_ushort(self._data))
-
- def get_short(self):
- """
- If the current node is a signed short, returns its value, returns
- 0 otherwise.
- """
- return short(pn_data_get_short(self._data))
-
- def get_uint(self):
- """
- If the current node is an unsigned int, returns its value, returns
- 0 otherwise.
- """
- return uint(pn_data_get_uint(self._data))
-
- def get_int(self):
- """
- If the current node is a signed int, returns its value, returns 0
- otherwise.
- """
- return int32(pn_data_get_int(self._data))
-
- def get_char(self):
- """
- If the current node is a char, returns its value, returns 0
- otherwise.
- """
- return char(_compat.unichr(pn_data_get_char(self._data)))
-
- def get_ulong(self):
- """
- If the current node is an unsigned long, returns its value,
- returns 0 otherwise.
- """
- return ulong(pn_data_get_ulong(self._data))
-
- def get_long(self):
- """
- If the current node is an signed long, returns its value, returns
- 0 otherwise.
- """
- return long(pn_data_get_long(self._data))
-
- def get_timestamp(self):
- """
- If the current node is a timestamp, returns its value, returns 0
- otherwise.
- """
- return timestamp(pn_data_get_timestamp(self._data))
-
- def get_float(self):
- """
- If the current node is a float, returns its value, raises 0
- otherwise.
- """
- return float32(pn_data_get_float(self._data))
-
- def get_double(self):
- """
- If the current node is a double, returns its value, returns 0
- otherwise.
- """
- return pn_data_get_double(self._data)
-
- # XXX: need to convert
- def get_decimal32(self):
- """
- If the current node is a decimal32, returns its value, returns 0
- otherwise.
- """
- return decimal32(pn_data_get_decimal32(self._data))
-
- # XXX: need to convert
- def get_decimal64(self):
- """
- If the current node is a decimal64, returns its value, returns 0
- otherwise.
- """
- return decimal64(pn_data_get_decimal64(self._data))
-
- # XXX: need to convert
- def get_decimal128(self):
- """
- If the current node is a decimal128, returns its value, returns 0
- otherwise.
- """
- return decimal128(pn_data_get_decimal128(self._data))
-
- def get_uuid(self):
- """
- If the current node is a UUID, returns its value, returns None
- otherwise.
- """
- if pn_data_type(self._data) == Data.UUID:
- return uuid.UUID(bytes=pn_data_get_uuid(self._data))
- else:
- return None
-
- def get_binary(self):
- """
- If the current node is binary, returns its value, returns ""
- otherwise.
- """
- return pn_data_get_binary(self._data)
-
- def get_string(self):
- """
- If the current node is a string, returns its value, returns ""
- otherwise.
- """
- return pn_data_get_string(self._data).decode("utf8")
-
- def get_symbol(self):
- """
- If the current node is a symbol, returns its value, returns ""
- otherwise.
- """
- return symbol(pn_data_get_symbol(self._data).decode('ascii'))
-
- def copy(self, src):
- self._check(pn_data_copy(self._data, src._data))
-
- def format(self):
- sz = 16
- while True:
- err, result = pn_data_format(self._data, sz)
- if err == PN_OVERFLOW:
- sz *= 2
- continue
- else:
- self._check(err)
- return result
-
- def dump(self):
- pn_data_dump(self._data)
-
- def put_dict(self, d):
- self.put_map()
- self.enter()
- try:
- for k, v in d.items():
- self.put_object(k)
- self.put_object(v)
- finally:
- self.exit()
-
- def get_dict(self):
- if self.enter():
- try:
- result = {}
- while self.next():
- k = self.get_object()
- if self.next():
- v = self.get_object()
- else:
- v = None
- result[k] = v
- finally:
- self.exit()
- return result
-
- def put_sequence(self, s):
- self.put_list()
- self.enter()
- try:
- for o in s:
- self.put_object(o)
- finally:
- self.exit()
-
- def get_sequence(self):
- if self.enter():
- try:
- result = []
- while self.next():
- result.append(self.get_object())
- finally:
- self.exit()
- return result
-
- def get_py_described(self):
- if self.enter():
- try:
- self.next()
- descriptor = self.get_object()
- self.next()
- value = self.get_object()
- finally:
- self.exit()
- return Described(descriptor, value)
-
- def put_py_described(self, d):
- self.put_described()
- self.enter()
- try:
- self.put_object(d.descriptor)
- self.put_object(d.value)
- finally:
- self.exit()
-
- def get_py_array(self):
- """
- If the current node is an array, return an Array object
- representing the array and its contents. Otherwise return None.
- This is a convenience wrapper around get_array, enter, etc.
- """
-
- count, described, type = self.get_array()
- if type is None: return None
- if self.enter():
- try:
- if described:
- self.next()
- descriptor = self.get_object()
- else:
- descriptor = UNDESCRIBED
- elements = []
- while self.next():
- elements.append(self.get_object())
- finally:
- self.exit()
- return Array(descriptor, type, *elements)
-
- def put_py_array(self, a):
- described = a.descriptor != UNDESCRIBED
- self.put_array(described, a.type)
- self.enter()
- try:
- if described:
- self.put_object(a.descriptor)
- for e in a.elements:
- self.put_object(e)
- finally:
- self.exit()
-
- put_mappings = {
- None.__class__: lambda s, _: s.put_null(),
- bool: put_bool,
- ubyte: put_ubyte,
- ushort: put_ushort,
- uint: put_uint,
- ulong: put_ulong,
- byte: put_byte,
- short: put_short,
- int32: put_int,
- long: put_long,
- float32: put_float,
- float: put_double,
- decimal32: put_decimal32,
- decimal64: put_decimal64,
- decimal128: put_decimal128,
- char: put_char,
- timestamp: put_timestamp,
- uuid.UUID: put_uuid,
- bytes: put_binary,
- unicode: put_string,
- symbol: put_symbol,
- list: put_sequence,
- tuple: put_sequence,
- dict: put_dict,
- Described: put_py_described,
- Array: put_py_array
- }
- # for python 3.x, long is merely an alias for int, but for python 2.x
- # we need to add an explicit int since it is a different type
- if int not in put_mappings:
- put_mappings[int] = put_int
- # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both.
- try: put_mappings[memoryview] = put_memoryview
- except NameError: pass
- try: put_mappings[buffer] = put_buffer
- except NameError: pass
- get_mappings = {
- NULL: lambda s: None,
- BOOL: get_bool,
- BYTE: get_byte,
- UBYTE: get_ubyte,
- SHORT: get_short,
- USHORT: get_ushort,
- INT: get_int,
- UINT: get_uint,
- CHAR: get_char,
- LONG: get_long,
- ULONG: get_ulong,
- TIMESTAMP: get_timestamp,
- FLOAT: get_float,
- DOUBLE: get_double,
- DECIMAL32: get_decimal32,
- DECIMAL64: get_decimal64,
- DECIMAL128: get_decimal128,
- UUID: get_uuid,
- BINARY: get_binary,
- STRING: get_string,
- SYMBOL: get_symbol,
- DESCRIBED: get_py_described,
- ARRAY: get_py_array,
- LIST: get_sequence,
- MAP: get_dict
- }
-
-
- def put_object(self, obj):
- putter = self.put_mappings[obj.__class__]
- putter(self, obj)
-
- def get_object(self):
- type = self.type()
- if type is None: return None
- getter = self.get_mappings.get(type)
- if getter:
- return getter(self)
- else:
- return UnmappedType(str(type))
-
-class ConnectionException(ProtonException):
- pass
-
-class Endpoint(object):
-
- LOCAL_UNINIT = PN_LOCAL_UNINIT
- REMOTE_UNINIT = PN_REMOTE_UNINIT
- LOCAL_ACTIVE = PN_LOCAL_ACTIVE
- REMOTE_ACTIVE = PN_REMOTE_ACTIVE
- LOCAL_CLOSED = PN_LOCAL_CLOSED
- REMOTE_CLOSED = PN_REMOTE_CLOSED
-
- def _init(self):
- self.condition = None
-
- def _update_cond(self):
- obj2cond(self.condition, self._get_cond_impl())
-
- @property
- def remote_condition(self):
- return cond2obj(self._get_remote_cond_impl())
-
- # the following must be provided by subclasses
- def _get_cond_impl(self):
- assert False, "Subclass must override this!"
-
- def _get_remote_cond_impl(self):
- assert False, "Subclass must override this!"
-
- def _get_handler(self):
- from . import reactor
- ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
- if ractor:
- on_error = ractor.on_error_delegate()
- else:
- on_error = None
- record = self._get_attachments()
- return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
-
- def _set_handler(self, handler):
- from . import reactor
- ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
- if ractor:
- on_error = ractor.on_error_delegate()
- else:
- on_error = None
- impl = _chandler(handler, on_error)
- record = self._get_attachments()
- pn_record_set_handler(record, impl)
- pn_decref(impl)
-
- handler = property(_get_handler, _set_handler)
-
- @property
- def transport(self):
- return self.connection.transport
-
-class Condition:
-
- def __init__(self, name, description=None, info=None):
- self.name = name
- self.description = description
- self.info = info
-
- def __repr__(self):
- return "Condition(%s)" % ", ".join([repr(x) for x in
- (self.name, self.description, self.info)
- if x])
-
- def __eq__(self, o):
- if not isinstance(o, Condition): return False
- return self.name == o.name and \
- self.description == o.description and \
- self.info == o.info
-
-def obj2cond(obj, cond):
- pn_condition_clear(cond)
- if obj:
- pn_condition_set_name(cond, str(obj.name))
- pn_condition_set_description(cond, obj.description)
- info = Data(pn_condition_info(cond))
- if obj.info:
- info.put_object(obj.info)
-
-def cond2obj(cond):
- if pn_condition_is_set(cond):
- return Condition(pn_condition_get_name(cond),
- pn_condition_get_description(cond),
- dat2obj(pn_condition_info(cond)))
- else:
- return None
-
-def dat2obj(dimpl):
- if dimpl:
- d = Data(dimpl)
- d.rewind()
- d.next()
- obj = d.get_object()
- d.rewind()
- return obj
-
-def obj2dat(obj, dimpl):
- if obj is not None:
- d = Data(dimpl)
- d.put_object(obj)
-
-def secs2millis(secs):
- return long(secs*1000)
-
-def millis2secs(millis):
- return float(millis)/1000.0
-
-def timeout2millis(secs):
- if secs is None: return PN_MILLIS_MAX
- return secs2millis(secs)
-
-def millis2timeout(millis):
- if millis == PN_MILLIS_MAX: return None
- return millis2secs(millis)
-
-def unicode2utf8(string):
- """Some Proton APIs expect a null terminated string. Convert python text
- types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
- This method will throw if the string cannot be converted.
- """
- if string is None:
- return None
- elif isinstance(string, str):
- # Must be py2 or py3 str
- # The swig binding converts py3 str -> utf8 char* and back sutomatically
- return string
- elif isinstance(string, unicode):
- # This must be python2 unicode as we already detected py3 str above
- return string.encode('utf-8')
- # Anything else illegal - specifically python3 bytes
- raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
-
-def utf82unicode(string):
- """Convert C strings returned from proton-c into python unicode"""
- if string is None:
- return None
- elif isinstance(string, unicode):
- # py2 unicode, py3 str (via hack definition)
- return string
- elif isinstance(string, bytes):
- # py2 str (via hack definition), py3 bytes
- return string.decode('utf8')
- raise TypeError("Unrecognized string type")
-
-class Connection(Wrapper, Endpoint):
- """
- A representation of an AMQP connection
- """
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Connection(impl)
-
- def __init__(self, impl = pn_connection):
- Wrapper.__init__(self, impl, pn_connection_attachments)
-
- def _init(self):
- Endpoint._init(self)
- self.offered_capabilities = None
- self.desired_capabilities = None
- self.properties = None
-
- def _get_attachments(self):
- return pn_connection_attachments(self._impl)
-
- @property
- def connection(self):
- return self
-
- @property
- def transport(self):
- return Transport.wrap(pn_connection_transport(self._impl))
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, ConnectionException)
- raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
- else:
- return err
-
- def _get_cond_impl(self):
- return pn_connection_condition(self._impl)
-
- def _get_remote_cond_impl(self):
- return pn_connection_remote_condition(self._impl)
-
- def collect(self, collector):
- if collector is None:
- pn_connection_collect(self._impl, None)
- else:
- pn_connection_collect(self._impl, collector._impl)
- self._collector = weakref.ref(collector)
-
- def _get_container(self):
- return utf82unicode(pn_connection_get_container(self._impl))
- def _set_container(self, name):
- return pn_connection_set_container(self._impl, unicode2utf8(name))
-
- container = property(_get_container, _set_container)
-
- def _get_hostname(self):
- return utf82unicode(pn_connection_get_hostname(self._impl))
- def _set_hostname(self, name):
- return pn_connection_set_hostname(self._impl, unicode2utf8(name))
-
- hostname = property(_get_hostname, _set_hostname,
- doc="""
-Set the name of the host (either fully qualified or relative) to which this
-connection is connecting to. This information may be used by the remote
-peer to determine the correct back-end service to connect the client to.
-This value will be sent in the Open performative, and will be used by SSL
-and SASL layers to identify the peer.
-""")
-
- def _get_user(self):
- return utf82unicode(pn_connection_get_user(self._impl))
- def _set_user(self, name):
- return pn_connection_set_user(self._impl, unicode2utf8(name))
-
- user = property(_get_user, _set_user)
-
- def _get_password(self):
- return None
- def _set_password(self, name):
- return pn_connection_set_password(self._impl, unicode2utf8(name))
-
- password = property(_get_password, _set_password)
-
- @property
- def remote_container(self):
- """The container identifier specified by the remote peer for this connection."""
- return pn_connection_remote_container(self._impl)
-
- @property
- def remote_hostname(self):
- """The hostname specified by the remote peer for this connection."""
- return pn_connection_remote_hostname(self._impl)
-
- @property
- def remote_offered_capabilities(self):
- """The capabilities offered by the remote peer for this connection."""
- return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
-
- @property
- def remote_desired_capabilities(self):
- """The capabilities desired by the remote peer for this connection."""
- return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
-
- @property
- def remote_properties(self):
- """The properties specified by the remote peer for this connection."""
- return dat2obj(pn_connection_remote_properties(self._impl))
-
- def open(self):
- """
- Opens the connection.
-
- In more detail, this moves the local state of the connection to
- the ACTIVE state and triggers an open frame to be sent to the
- peer. A connection is fully active once both peers have opened it.
- """
- obj2dat(self.offered_capabilities,
- pn_connection_offered_capabilities(self._impl))
- obj2dat(self.desired_capabilities,
- pn_connection_desired_capabilities(self._impl))
- obj2dat(self.properties, pn_connection_properties(self._impl))
- pn_connection_open(self._impl)
-
- def close(self):
- """
- Closes the connection.
-
- In more detail, this moves the local state of the connection to
- the CLOSED state and triggers a close frame to be sent to the
- peer. A connection is fully closed once both peers have closed it.
- """
- self._update_cond()
- pn_connection_close(self._impl)
- if hasattr(self, '_session_policy'):
- # break circular ref
- del self._session_policy
-
- @property
- def state(self):
- """
- The state of the connection as a bit field. The state has a local
- and a remote component. Each of these can be in one of three
- states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
- against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
- REMOTE_ACTIVE and REMOTE_CLOSED.
- """
- return pn_connection_state(self._impl)
-
- def session(self):
- """
- Returns a new session on this connection.
- """
- ssn = pn_session(self._impl)
- if ssn is None:
- raise(SessionException("Session allocation failed."))
- else:
- return Session(ssn)
-
- def session_head(self, mask):
- return Session.wrap(pn_session_head(self._impl, mask))
-
- def link_head(self, mask):
- return Link.wrap(pn_link_head(self._impl, mask))
-
- @property
- def work_head(self):
- return Delivery.wrap(pn_work_head(self._impl))
-
- @property
- def error(self):
- return pn_error_code(pn_connection_error(self._impl))
-
- def free(self):
- pn_connection_release(self._impl)
-
-class SessionException(ProtonException):
- pass
-
-class Session(Wrapper, Endpoint):
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Session(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_session_attachments)
-
- def _get_attachments(self):
- return pn_session_attachments(self._impl)
-
- def _get_cond_impl(self):
- return pn_session_condition(self._impl)
-
- def _get_remote_cond_impl(self):
- return pn_session_remote_condition(self._impl)
-
- def _get_incoming_capacity(self):
- return pn_session_get_incoming_capacity(self._impl)
-
- def _set_incoming_capacity(self, capacity):
- pn_session_set_incoming_capacity(self._impl, capacity)
-
- incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
-
- def _get_outgoing_window(self):
- return pn_session_get_outgoing_window(self._impl)
-
- def _set_outgoing_window(self, window):
- pn_session_set_outgoing_window(self._impl, window)
-
- outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
-
- @property
- def outgoing_bytes(self):
- return pn_session_outgoing_bytes(self._impl)
-
- @property
- def incoming_bytes(self):
- return pn_session_incoming_bytes(self._impl)
-
- def open(self):
- pn_session_open(self._impl)
-
- def close(self):
- self._update_cond()
- pn_session_close(self._impl)
-
- def next(self, mask):
- return Session.wrap(pn_session_next(self._impl, mask))
-
- @property
- def state(self):
- return pn_session_state(self._impl)
-
- @property
- def connection(self):
- return Connection.wrap(pn_session_connection(self._impl))
-
- def sender(self, name):
- return Sender(pn_sender(self._impl, unicode2utf8(name)))
-
- def receiver(self, name):
- return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
-
- def free(self):
- pn_session_free(self._impl)
-
-class LinkException(ProtonException):
- pass
-
-class Link(Wrapper, Endpoint):
- """
- A representation of an AMQP link, of which there are two concrete
- implementations, Sender and Receiver.
- """
-
- SND_UNSETTLED = PN_SND_UNSETTLED
- SND_SETTLED = PN_SND_SETTLED
- SND_MIXED = PN_SND_MIXED
-
- RCV_FIRST = PN_RCV_FIRST
- RCV_SECOND = PN_RCV_SECOND
-
- @staticmethod
- def wrap(impl):
- if impl is None: return None
- if pn_link_is_sender(impl):
- return Sender(impl)
- else:
- return Receiver(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_link_attachments)
-
- def _get_attachments(self):
- return pn_link_attachments(self._impl)
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, LinkException)
- raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
- else:
- return err
-
- def _get_cond_impl(self):
- return pn_link_condition(self._impl)
-
- def _get_remote_cond_impl(self):
- return pn_link_remote_condition(self._impl)
-
- def open(self):
- """
- Opens the link.
-
- In more detail, this moves the local state of the link to the
- ACTIVE state and triggers an attach frame to be sent to the
- peer. A link is fully active once both peers have attached it.
- """
- pn_link_open(self._impl)
-
- def close(self):
- """
- Closes the link.
-
- In more detail, this moves the local state of the link to the
- CLOSED state and triggers an detach frame (with the closed flag
- set) to be sent to the peer. A link is fully closed once both
- peers have detached it.
- """
- self._update_cond()
- pn_link_close(self._impl)
-
- @property
- def state(self):
- """
- The state of the link as a bit field. The state has a local
- and a remote component. Each of these can be in one of three
- states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
- against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
- REMOTE_ACTIVE and REMOTE_CLOSED.
- """
- return pn_link_state(self._impl)
-
- @property
- def source(self):
- """The source of the link as described by the local peer."""
- return Terminus(pn_link_source(self._impl))
-
- @property
- def target(self):
- """The target of the link as described by the local peer."""
- return Terminus(pn_link_target(self._impl))
-
- @property
- def remote_source(self):
- """The source of the link as described by the remote peer."""
- return Terminus(pn_link_remote_source(self._impl))
- @property
- def remote_target(self):
- """The target of the link as described by the remote peer."""
- return Terminus(pn_link_remote_target(self._impl))
-
- @property
- def session(self):
- return Session.wrap(pn_link_session(self._impl))
-
- @property
- def connection(self):
- """The connection on which this link was attached."""
- return self.session.connection
-
- def delivery(self, tag):
- return Delivery(pn_delivery(self._impl, tag))
-
- @property
- def current(self):
- return Delivery.wrap(pn_link_current(self._impl))
-
- def advance(self):
- return pn_link_advance(self._impl)
-
- @property
- def unsettled(self):
- return pn_link_unsettled(self._impl)
-
- @property
- def credit(self):
- """The amount of outstanding credit on this link."""
- return pn_link_credit(self._impl)
-
- @property
- def available(self):
- return pn_link_available(self._impl)
-
- @property
- def queued(self):
- return pn_link_queued(self._impl)
-
- def next(self, mask):
- return Link.wrap(pn_link_next(self._impl, mask))
-
- @property
- def name(self):
- """Returns the name of the link"""
- return utf82unicode(pn_link_name(self._impl))
-
- @property
- def is_sender(self):
- """Returns true if this link is a sender."""
- return pn_link_is_sender(self._impl)
-
- @property
- def is_receiver(self):
- """Returns true if this link is a receiver."""
- return pn_link_is_receiver(self._impl)
-
- @property
- def remote_snd_settle_mode(self):
- return pn_link_remote_snd_settle_mode(self._impl)
-
- @property
- def remote_rcv_settle_mode(self):
- return pn_link_remote_rcv_settle_mode(self._impl)
-
- def _get_snd_settle_mode(self):
- return pn_link_snd_settle_mode(self._impl)
- def _set_snd_settle_mode(self, mode):
- pn_link_set_snd_settle_mode(self._impl, mode)
- snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
-
- def _get_rcv_settle_mode(self):
- return pn_link_rcv_settle_mode(self._impl)
- def _set_rcv_settle_mode(self, mode):
- pn_link_set_rcv_settle_mode(self._impl, mode)
- rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
-
- def _get_drain(self):
- return pn_link_get_drain(self._impl)
-
- def _set_drain(self, b):
- pn_link_set_drain(self._impl, bool(b))
-
- drain_mode = property(_get_drain, _set_drain)
-
- def drained(self):
- return pn_link_drained(self._impl)
-
- @property
- def remote_max_message_size(self):
- return pn_link_remote_max_message_size(self._impl)
-
- def _get_max_message_size(self):
- return pn_link_max_message_size(self._impl)
- def _set_max_message_size(self, mode):
- pn_link_set_max_message_size(self._impl, mode)
- max_message_size = property(_get_max_message_size, _set_max_message_size)
-
- def detach(self):
- return pn_link_detach(self._impl)
-
- def free(self):
- pn_link_free(self._impl)
-
-class Terminus(object):
-
- UNSPECIFIED = PN_UNSPECIFIED
- SOURCE = PN_SOURCE
- TARGET = PN_TARGET
- COORDINATOR = PN_COORDINATOR
-
- NONDURABLE = PN_NONDURABLE
- CONFIGURATION = PN_CONFIGURATION
- DELIVERIES = PN_DELIVERIES
-
- DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
- DIST_MODE_COPY = PN_DIST_MODE_COPY
- DIST_MODE_MOVE = PN_DIST_MODE_MOVE
-
- EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
- EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
- EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
- EXPIRE_NEVER = PN_EXPIRE_NEVER
-
- def __init__(self, impl):
- self._impl = impl
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, LinkException)
- raise exc("[%s]" % err)
- else:
- return err
-
- def _get_type(self):
- return pn_terminus_get_type(self._impl)
- def _set_type(self, type):
- self._check(pn_terminus_set_type(self._impl, type))
- type = property(_get_type, _set_type)
-
- def _get_address(self):
- """The address that identifies the source or target node"""
- return utf82unicode(pn_terminus_get_address(self._impl))
- def _set_address(self, address):
- self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
- address = property(_get_address, _set_address)
-
- def _get_durability(self):
- return pn_terminus_get_durability(self._impl)
- def _set_durability(self, seconds):
- self._check(pn_terminus_set_durability(self._impl, seconds))
- durability = property(_get_durability, _set_durability)
-
- def _get_expiry_policy(self):
- return pn_terminus_get_expiry_policy(self._impl)
- def _set_expiry_policy(self, seconds):
- self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
- expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
-
- def _get_timeout(self):
- return pn_terminus_get_timeout(self._impl)
- def _set_timeout(self, seconds):
- self._check(pn_terminus_set_timeout(self._impl, seconds))
- timeout = property(_get_timeout, _set_timeout)
-
- def _is_dynamic(self):
- """Indicates whether the source or target node was dynamically
- created"""
- return pn_terminus_is_dynamic(self._impl)
- def _set_dynamic(self, dynamic):
- self._check(pn_terminus_set_dynamic(self._impl, dynamic))
- dynamic = property(_is_dynamic, _set_dynamic)
-
- def _get_distribution_mode(self):
- return pn_terminus_get_distribution_mode(self._impl)
- def _set_distribution_mode(self, mode):
- self._check(pn_terminus_set_distribution_mode(self._impl, mode))
- distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
-
- @property
- def properties(self):
- """Properties of a dynamic source or target."""
- return Data(pn_terminus_properties(self._impl))
-
- @property
- def capabilities(self):
- """Capabilities of the source or target."""
- return Data(pn_terminus_capabilities(self._impl))
-
- @property
- def outcomes(self):
- return Data(pn_terminus_outcomes(self._impl))
-
- @property
- def filter(self):
- """A filter on a source allows the set of messages transfered over
- the link to be restricted"""
- return Data(pn_terminus_filter(self._impl))
-
- def copy(self, src):
- self._check(pn_terminus_copy(self._impl, src._impl))
-
-class Sender(Link):
- """
- A link over which messages are sent.
- """
-
- def offered(self, n):
- pn_link_offered(self._impl, n)
-
- def stream(self, data):
- """
- Send specified data as part of the current delivery
-
- @type data: binary
- @param data: data to send
- """
- return self._check(pn_link_send(self._impl, data))
-
- def send(self, obj, tag=None):
- """
- Send specified object over this sender; the object is expected to
- have a send() method on it that takes the sender and an optional
- tag as arguments.
-
- Where the object is a Message, this will send the message over
- this link, creating a new delivery for the purpose.
- """
- if hasattr(obj, 'send'):
- return obj.send(self, tag=tag)
- else:
- # treat object as bytes
- return self.stream(obj)
-
- def delivery_tag(self):
- if not hasattr(self, 'tag_generator'):
- def simple_tags():
- count = 1
- while True:
- yield str(count)
- count += 1
- self.tag_generator = simple_tags()
- return next(self.tag_generator)
-
-class Receiver(Link):
- """
- A link over which messages are received.
- """
-
- def flow(self, n):
- """Increases the credit issued to the remote sender by the specified number of messages."""
- pn_link_flow(self._impl, n)
-
- def recv(self, limit):
- n, binary = pn_link_recv(self._impl, limit)
- if n == PN_EOS:
- return None
- else:
- self._check(n)
- return binary
-
- def drain(self, n):
- pn_link_drain(self._impl, n)
-
- def draining(self):
- return pn_link_draining(self._impl)
-
-class NamedInt(int):
-
- values = {}
-
- def __new__(cls, i, name):
- ni = super(NamedInt, cls).__new__(cls, i)
- cls.values[i] = ni
- return ni
-
- def __init__(self, i, name):
- self.name = name
-
- def __repr__(self):
- return self.name
-
- def __str__(self):
- return self.name
-
- @classmethod
- def get(cls, i):
- return cls.values.get(i, i)
-
-class DispositionType(NamedInt):
- values = {}
-
-class Disposition(object):
-
- RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
- ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
- REJECTED = DispositionType(PN_REJECTED, "REJECTED")
- RELEASED = DispositionType(PN_RELEASED, "RELEASED")
- MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
-
- def __init__(self, impl, local):
- self._impl = impl
- self.local = local
- self._data = None
- self._condition = None
- self._annotations = None
-
- @property
- def type(self):
- return DispositionType.get(pn_disposition_type(self._impl))
-
- def _get_section_number(self):
- return pn_disposition_get_section_number(self._impl)
- def _set_section_number(self, n):
- pn_disposition_set_section_number(self._impl, n)
- section_number = property(_get_section_number, _set_section_number)
-
- def _get_section_offset(self):
- return pn_disposition_get_section_offset(self._impl)
- def _set_section_offset(self, n):
- pn_disposition_set_section_offset(self._impl, n)
- section_offset = property(_get_section_offset, _set_section_offset)
-
- def _get_failed(self):
- return pn_disposition_is_failed(self._impl)
- def _set_failed(self, b):
- pn_disposition_set_failed(self._impl, b)
- failed = property(_get_failed, _set_failed)
-
- def _get_undeliverable(self):
- return pn_disposition_is_undeliverable(self._impl)
- def _set_undeliverable(self, b):
- pn_disposition_set_undeliverable(self._impl, b)
- undeliverable = property(_get_undeliverable, _set_undeliverable)
-
- def _get_data(self):
- if self.local:
- return self._data
- else:
- return dat2obj(pn_disposition_data(self._impl))
- def _set_data(self, obj):
- if self.local:
- self._data = obj
- else:
- raise AttributeError("data attribute is read-only")
- data = property(_get_data, _set_data)
-
- def _get_annotations(self):
- if self.local:
- return self._annotations
- else:
- return dat2obj(pn_disposition_annotations(self._impl))
- def _set_annotations(self, obj):
- if self.local:
- self._annotations = obj
- else:
- raise AttributeError("annotations attribute is read-only")
- annotations = property(_get_annotations, _set_annotations)
-
- def _get_condition(self):
- if self.local:
- return self._condition
- else:
- return cond2obj(pn_disposition_condition(self._impl))
- def _set_condition(self, obj):
- if self.local:
- self._condition = obj
- else:
- raise AttributeError("condition attribute is read-only")
- condition = property(_get_condition, _set_condition)
-
-class Delivery(Wrapper):
- """
- Tracks and/or records the delivery of a message over a link.
- """
-
- RECEIVED = Disposition.RECEIVED
- ACCEPTED = Disposition.ACCEPTED
- REJECTED = Disposition.REJECTED
- RELEASED = Disposition.RELEASED
- MODIFIED = Disposition.MODIFIED
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Delivery(impl)
-
- def __init__(self, impl):
- Wrapper.__init__(self, impl, pn_delivery_attachments)
-
- def _init(self):
- self.local = Disposition(pn_delivery_local(self._impl), True)
- self.remote = Disposition(pn_delivery_remote(self._impl), False)
-
- @property
- def tag(self):
- """The identifier for the delivery."""
- return pn_delivery_tag(self._impl)
-
- @property
- def writable(self):
- """Returns true for an outgoing delivery to which data can now be written."""
- return pn_delivery_writable(self._impl)
-
- @property
- def readable(self):
- """Returns true for an incoming delivery that has data to read."""
- return pn_delivery_readable(self._impl)
-
- @property
- def updated(self):
- """Returns true if the state of the delivery has been updated
- (e.g. it has been settled and/or accepted, rejected etc)."""
- return pn_delivery_updated(self._impl)
-
- def update(self, state):
- """
- Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
- """
- obj2dat(self.local._data, pn_disposition_data(self.local._impl))
- obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
- obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
- pn_delivery_update(self._impl, state)
-
- @property
- def pending(self):
- return pn_delivery_pending(self._impl)
-
- @property
- def partial(self):
- """
- Returns true for an incoming delivery if not all the data is
- yet available.
- """
- return pn_delivery_partial(self._impl)
-
- @property
- def local_state(self):
- """Returns the local state of the delivery."""
- return DispositionType.get(pn_delivery_local_state(self._impl))
-
- @property
- def remote_state(self):
- """
- Returns the state of the delivery as indicated by the remote
- peer.
- """
- return DispositionType.get(pn_delivery_remote_state(self._impl))
-
- @property
- def settled(self):
- """
- Returns true if the delivery has been settled by the remote peer.
- """
- return pn_delivery_settled(self._impl)
-
- def settle(self):
- """
- Settles the delivery locally. This indicates the application
- considers the delivery complete and does not wish to receive any
- further events about it. Every delivery should be settled locally.
- """
- pn_delivery_settle(self._impl)
-
- @property
- def aborted(self):
- """Returns true if the delivery has been aborted."""
- return pn_delivery_aborted(self._impl)
-
- def abort(self):
- """
- Aborts the delivery. This indicates the application wishes to
- invalidate any data that may have already been sent on this delivery.
- The delivery cannot be aborted after it has been completely delivered.
- """
- pn_delivery_abort(self._impl)
-
- @property
- def work_next(self):
- return Delivery.wrap(pn_work_next(self._impl))
-
- @property
- def link(self):
- """
- Returns the link on which the delivery was sent or received.
- """
- return Link.wrap(pn_delivery_link(self._impl))
-
- @property
- def session(self):
- """
- Returns the session over which the delivery was sent or received.
- """
- return self.link.session
-
- @property
- def connection(self):
- """
- Returns the connection over which the delivery was sent or received.
- """
- return self.session.connection
-
- @property
- def transport(self):
- return self.connection.transport
-
-class TransportException(ProtonException):
- pass
-
-class TraceAdapter:
-
- def __init__(self, tracer):
- self.tracer = tracer
-
- def __call__(self, trans_impl, message):
- self.tracer(Transport.wrap(trans_impl), message)
-
-class Transport(Wrapper):
-
- TRACE_OFF = PN_TRACE_OFF
- TRACE_DRV = PN_TRACE_DRV
- TRACE_FRM = PN_TRACE_FRM
- TRACE_RAW = PN_TRACE_RAW
-
- CLIENT = 1
- SERVER = 2
-
- @staticmethod
- def wrap(impl):
- if impl is None:
- return None
- else:
- return Transport(_impl=impl)
-
- def __init__(self, mode=None, _impl = pn_transport):
- Wrapper.__init__(self, _impl, pn_transport_attachments)
- if mode == Transport.SERVER:
- pn_transport_set_server(self._impl)
- elif mode is None or mode==Transport.CLIENT:
- pass
- else:
- raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
-
- def _init(self):
- self._sasl = None
- self._ssl = None
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, TransportException)
- raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
- else:
- return err
-
- def _set_tracer(self, tracer):
- pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
-
- def _get_tracer(self):
- adapter = pn_transport_get_pytracer(self._impl)
- if adapter:
- return adapter.tracer
- else:
- return None
-
- tracer = property(_get_tracer, _set_tracer,
- doc="""
-A callback for trace logging. The callback is passed the transport and log message.
-""")
-
- def log(self, message):
- pn_transport_log(self._impl, message)
-
- def require_auth(self, bool):
- pn_transport_require_auth(self._impl, bool)
-
- @property
- def authenticated(self):
- return pn_transport_is_authenticated(self._impl)
-
- def require_encryption(self, bool):
- pn_transport_require_encryption(self._impl, bool)
-
- @property
- def encrypted(self):
- return pn_transport_is_encrypted(self._impl)
-
- @property
- def user(self):
- return pn_transport_get_user(self._impl)
-
- def bind(self, connection):
- """Assign a connection to the transport"""
- self._check(pn_transport_bind(self._impl, connection._impl))
-
- def unbind(self):
- """Release the connection"""
- self._check(pn_transport_unbind(self._impl))
-
- def trace(self, n):
- pn_transport_trace(self._impl, n)
-
- def tick(self, now):
- """Process any timed events (like heartbeat generation).
- now = seconds since epoch (float).
- """
- return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
-
- def capacity(self):
- c = pn_transport_capacity(self._impl)
- if c >= PN_EOS:
- return c
- else:
- return self._check(c)
-
- def push(self, binary):
- n = self._check(pn_transport_push(self._impl, binary))
- if n != len(binary):
- raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
-
- def close_tail(self):
- self._check(pn_transport_close_tail(self._impl))
-
- def pending(self):
- p = pn_transport_pending(self._impl)
- if p >= PN_EOS:
- return p
- else:
- return self._check(p)
-
- def peek(self, size):
- cd, out = pn_transport_peek(self._impl, size)
- if cd == PN_EOS:
- return None
- else:
- self._check(cd)
- return out
-
- def pop(self, size):
- pn_transport_pop(self._impl, size)
-
- def close_head(self):
- self._check(pn_transport_close_head(self._impl))
-
- @property
- def closed(self):
- return pn_transport_closed(self._impl)
-
- # AMQP 1.0 max-frame-size
- def _get_max_frame_size(self):
- return pn_transport_get_max_frame(self._impl)
-
- def _set_max_frame_size(self, value):
- pn_transport_set_max_frame(self._impl, value)
-
- max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
- doc="""
-Sets the maximum size for received frames (in bytes).
-""")
-
- @property
- def remote_max_frame_size(self):
- return pn_transport_get_remote_max_frame(self._impl)
-
- def _get_channel_max(self):
- return pn_transport_get_channel_max(self._impl)
-
- def _set_channel_max(self, value):
- if pn_transport_set_channel_max(self._impl, value):
- raise SessionException("Too late to change channel max.")
-
- channel_max = property(_get_channel_max, _set_channel_max,
- doc="""
-Sets the maximum channel that may be used on the transport.
-""")
-
- @property
- def remote_channel_max(self):
- return pn_transport_remote_channel_max(self._impl)
-
- # AMQP 1.0 idle-time-out
- def _get_idle_timeout(self):
- return millis2secs(pn_transport_get_idle_timeout(self._impl))
-
- def _set_idle_timeout(self, sec):
- pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
-
- idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
- doc="""
-The idle timeout of the connection (float, in seconds).
-""")
-
- @property
- def remote_idle_timeout(self):
- return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
-
- @property
- def frames_output(self):
- return pn_transport_get_frames_output(self._impl)
-
- @property
- def frames_input(self):
- return pn_transport_get_frames_input(self._impl)
-
- def sasl(self):
- return SASL(self)
-
- def ssl(self, domain=None, session_details=None):
- # SSL factory (singleton for this transport)
- if not self._ssl:
- self._ssl = SSL(self, domain, session_details)
- return self._ssl
-
- @property
- def condition(self):
- return cond2obj(pn_transport_condition(self._impl))
-
- @property
- def connection(self):
- return Connection.wrap(pn_transport_connection(self._impl))
-
-class SASLException(TransportException):
- pass
-
-class SASL(Wrapper):
-
- OK = PN_SASL_OK
- AUTH = PN_SASL_AUTH
- SYS = PN_SASL_SYS
- PERM = PN_SASL_PERM
- TEMP = PN_SASL_TEMP
-
- @staticmethod
- def extended():
- return pn_sasl_extended()
-
- def __init__(self, transport):
- Wrapper.__init__(self, transport._impl, pn_transport_attachments)
- self._sasl = pn_sasl(transport._impl)
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, SASLException)
- raise exc("[%s]" % (err))
- else:
- return err
-
- @property
- def user(self):
- return pn_sasl_get_user(self._sasl)
-
- @property
- def mech(self):
- return pn_sasl_get_mech(self._sasl)
-
- @property
- def outcome(self):
- outcome = pn_sasl_outcome(self._sasl)
- if outcome == PN_SASL_NONE:
- return None
- else:
- return outcome
-
- def allowed_mechs(self, mechs):
- pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
-
- def _get_allow_insecure_mechs(self):
- return pn_sasl_get_allow_insecure_mechs(self._sasl)
-
- def _set_allow_insecure_mechs(self, insecure):
- pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
-
- allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
- doc="""
-Allow unencrypted cleartext passwords (PLAIN mech)
-""")
-
- def done(self, outcome):
- pn_sasl_done(self._sasl, outcome)
-
- def config_name(self, name):
- pn_sasl_config_name(self._sasl, name)
-
- def config_path(self, path):
- pn_sasl_config_path(self._sasl, path)
-
-class SSLException(TransportException):
- pass
-
-class SSLUnavailable(SSLException):
- pass
-
-class SSLDomain(object):
-
- MODE_CLIENT = PN_SSL_MODE_CLIENT
- MODE_SERVER = PN_SSL_MODE_SERVER
- VERIFY_PEER = PN_SSL_VERIFY_PEER
- VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
- ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
-
- def __init__(self, mode):
- self._domain = pn_ssl_domain(mode)
- if self._domain is None:
- raise SSLUnavailable()
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, SSLException)
- raise exc("SSL failure.")
- else:
- return err
-
- def set_credentials(self, cert_file, key_file, password):
- return self._check( pn_ssl_domain_set_credentials(self._domain,
- cert_file, key_file,
- password) )
- def set_trusted_ca_db(self, certificate_db):
- return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
- certificate_db) )
- def set_peer_authentication(self, verify_mode, trusted_CAs=None):
- return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
- verify_mode,
- trusted_CAs) )
-
- def allow_unsecured_client(self):
- return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
-
- def __del__(self):
- pn_ssl_domain_free(self._domain)
-
-class SSL(object):
-
- @staticmethod
- def present():
- return pn_ssl_present()
-
- def _check(self, err):
- if err < 0:
- exc = EXCEPTIONS.get(err, SSLException)
- raise exc("SSL failure.")
- else:
- return err
-
- def __new__(cls, transport, domain, session_details=None):
- """Enforce a singleton SSL object per Transport"""
- if transport._ssl:
- # unfortunately, we've combined the allocation and the configuration in a
- # single step. So catch any attempt by the application to provide what
- # may be a different configuration than the original (hack)
- ssl = transport._ssl
- if (domain and (ssl._domain is not domain) or
- session_details and (ssl._session_details is not session_details)):
- raise SSLException("Cannot re-configure existing SSL object!")
- else:
- obj = super(SSL, cls).__new__(cls)
- obj._domain = domain
- obj._session_details = session_details
- session_id = None
- if session_details:
- session_id = session_details.get_session_id()
- obj._ssl = pn_ssl( transport._impl )
- if obj._ssl is None:
- raise SSLUnavailable()
- if domain:
- pn_ssl_init( obj._ssl, domain._domain, session_id )
- transport._ssl = obj
- return transport._ssl
-
- def cipher_name(self):
- rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
- if rc:
- return name
- return None
-
- def protocol_name(self):
- rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
- if rc:
- return name
- return None
-
- SHA1 = PN_SSL_SHA1
- SHA256 = PN_SSL_SHA256
- SHA512 = PN_SSL_SHA512
- MD5 = PN_SSL_MD5
-
- CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
- CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
- CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
- CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
- CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
- CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
-
- def get_cert_subject_subfield(self, subfield_name):
- subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
- return subfield_value
-
- def get_cert_subject(self):
- subject = pn_ssl_get_remote_subject(self._ssl)
- return subject
-
- def _get_cert_subject_unknown_subfield(self):
- # Pass in an unhandled enum
- return self.get_cert_subject_subfield(10)
-
- # Convenience functions for obtaining the subfields of the subject field.
- def get_cert_common_name(self):
- return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME)
-
- def get_cert_organization(self):
- return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME)
-
- def get_cert_organization_unit(self):
- return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT)
-
- def get_cert_locality_or_city(self):
- return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY)
-
- def get_cert_country(self):
- return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME)
-
- def get_cert_state_or_province(self):
- return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE)
-
- def get_cert_fingerprint(self, fingerprint_length, digest_name):
- rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
- if rc == PN_OK:
- return fingerprint_str
- return None
-
- # Convenience functions for obtaining fingerprint for specific hashing algorithms
- def _get_cert_fingerprint_unknown_hash_alg(self):
- return self.get_cert_fingerprint(41, 10)
-
- def get_cert_fingerprint_sha1(self):
- return self.get_cert_fingerprint(41, SSL.SHA1)
-
- def get_cert_fingerprint_sha256(self):
- # sha256 produces a fingerprint that is 64 characters long
- return self.get_cert_fingerprint(65, SSL.SHA256)
-
- def get_cert_fingerprint_sha512(self):
- # sha512 produces a fingerprint that is 128 characters long
- return self.get_cert_fingerprint(129, SSL.SHA512)
-
- def get_cert_fingerprint_md5(self):
- return self.get_cert_fingerprint(33, SSL.MD5)
-
- @property
- def remote_subject(self):
- return pn_ssl_get_remote_subject( self._ssl )
-
- RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
- RESUME_NEW = PN_SSL_RESUME_NEW
- RESUME_REUSED = PN_SSL_RESUME_REUSED
-
- def resume_status(self):
- return pn_ssl_resume_status( self._ssl )
-
- def _set_peer_hostname(self, hostname):
- self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
- def
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org