You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/04 15:57:37 UTC
[05/41] qpid-proton git commit: PROTON-1850: Split up proton
__init__.py into multiple files - Reformatted python source to (mostly) PEP-8
standards - Control what gets exported from __init__ by restricting what it
imports - Move most of the reactor impl
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_common.py
----------------------------------------------------------------------
diff --git a/python/proton/_common.py b/python/proton/_common.py
new file mode 100644
index 0000000..3715c6a
--- /dev/null
+++ b/python/proton/_common.py
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+#
+# Hacks to provide Python2 <---> Python3 compatibility
+#
+# The results are
+# | |long|unicode|
+# |python2|long|unicode|
+# |python3| int| str|
+try:
+ long()
+except NameError:
+ long = int
+try:
+ unicode()
+except NameError:
+ unicode = str
+
+
+def isinteger(value):
+ return isinstance(value, (int, long))
+
+
+def isstring(value):
+ return isinstance(value, (str, unicode))
+
+
+class Constant(object):
+
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.name
+
+
+def secs2millis(secs):
+ return long(secs * 1000)
+
+
+def millis2secs(millis):
+ return float(millis) / 1000.0
+
+
+def unicode2utf8(string):
+ """Some Proton APIs expect a null terminated string. Convert python text
+ types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
+ This method will throw if the string cannot be converted.
+ """
+ if string is None:
+ return None
+ elif isinstance(string, str):
+ # Must be py2 or py3 str
+ # The swig binding converts py3 str -> utf8 char* and back sutomatically
+ return string
+ elif isinstance(string, unicode):
+ # This must be python2 unicode as we already detected py3 str above
+ return string.encode('utf-8')
+ # Anything else illegal - specifically python3 bytes
+ raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
+
+
+def utf82unicode(string):
+ """Convert C strings returned from proton-c into python unicode"""
+ if string is None:
+ return None
+ elif isinstance(string, unicode):
+ # py2 unicode, py3 str (via hack definition)
+ return string
+ elif isinstance(string, bytes):
+ # py2 str (via hack definition), py3 bytes
+ return string.decode('utf8')
+ raise TypeError("Unrecognized string type")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_compat.py
----------------------------------------------------------------------
diff --git a/python/proton/_compat.py b/python/proton/_compat.py
index afd82e3..eae4c84 100644
--- a/python/proton/_compat.py
+++ b/python/proton/_compat.py
@@ -32,8 +32,6 @@ except ImportError:
PY3 = sys.version_info[0] == 3
if PY3:
- string_types = (str,)
-
def raise_(t, v=None, tb=None):
"""Mimic the old 2.x raise behavior:
Raise an exception of type t with value v using optional traceback tb
@@ -45,23 +43,22 @@ if PY3:
else:
raise v.with_traceback(tb)
+
def iteritems(d, **kw):
return iter(d.items(**kw))
+
unichr = chr
else:
- # includes both unicode and non-unicode strings:
- string_types = (basestring,)
-
# the raise syntax will cause a parse error in Py3, so 'sneak' in a
# definition that won't cause the parser to barf
- exec("""def raise_(t, v=None, tb=None):
+ exec ("""def raise_(t, v=None, tb=None):
raise t, v, tb
""")
+
def iteritems(d, **kw):
return d.iteritems(**kw)
- unichr = unichr
-__all__ = [ 'PY3', 'queue', 'string_types', 'raise_', 'iteritems', 'unichr']
\ No newline at end of file
+ unichr = unichr
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_condition.py
----------------------------------------------------------------------
diff --git a/python/proton/_condition.py b/python/proton/_condition.py
new file mode 100644
index 0000000..e5dbde9
--- /dev/null
+++ b/python/proton/_condition.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import pn_condition_clear, pn_condition_set_name, pn_condition_set_description, pn_condition_info, \
+ pn_condition_is_set, pn_condition_get_name, pn_condition_get_description
+
+from ._data import Data, dat2obj
+
+
+class Condition:
+
+ def __init__(self, name, description=None, info=None):
+ self.name = name
+ self.description = description
+ self.info = info
+
+ def __repr__(self):
+ return "Condition(%s)" % ", ".join([repr(x) for x in
+ (self.name, self.description, self.info)
+ if x])
+
+ def __eq__(self, o):
+ if not isinstance(o, Condition): return False
+ return self.name == o.name and \
+ self.description == o.description and \
+ self.info == o.info
+
+
+def obj2cond(obj, cond):
+ pn_condition_clear(cond)
+ if obj:
+ pn_condition_set_name(cond, str(obj.name))
+ pn_condition_set_description(cond, obj.description)
+ info = Data(pn_condition_info(cond))
+ if obj.info:
+ info.put_object(obj.info)
+
+
+def cond2obj(cond):
+ if pn_condition_is_set(cond):
+ return Condition(pn_condition_get_name(cond),
+ pn_condition_get_description(cond),
+ dat2obj(pn_condition_info(cond)))
+ else:
+ return None
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_data.py
----------------------------------------------------------------------
diff --git a/python/proton/_data.py b/python/proton/_data.py
new file mode 100644
index 0000000..f4ad381
--- /dev/null
+++ b/python/proton/_data.py
@@ -0,0 +1,1129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import uuid
+
+from cproton import PN_TIMESTAMP, PN_FLOAT, PN_DESCRIBED, PN_DECIMAL64, PN_UBYTE, PN_UUID, PN_NULL, PN_BINARY, \
+ PN_LIST, PN_OVERFLOW, PN_MAP, PN_LONG, PN_SHORT, PN_CHAR, PN_UINT, PN_ULONG, PN_STRING, PN_USHORT, PN_DOUBLE, \
+ PN_BYTE, PN_DECIMAL32, PN_DECIMAL128, PN_ARRAY, PN_SYMBOL, PN_BOOL, PN_INT, \
+ pn_data_get_binary, pn_data_get_decimal64, pn_data_put_symbol, pn_data_put_float, \
+ pn_data_is_array_described, pn_data_exit, pn_data_put_uint, pn_data_put_decimal128, \
+ pn_data_lookup, pn_data_put_char, pn_data_encoded_size, pn_data_get_bool, \
+ pn_data_get_short, pn_data_prev, pn_data_type, pn_data_widen, pn_data_put_decimal64, \
+ pn_data_put_string, pn_data_get_array, pn_data_put_ulong, pn_data_get_byte, pn_data_get_symbol, pn_data_encode, \
+ pn_data_rewind, pn_data_put_bool, pn_data_is_null, pn_data_error, \
+ pn_data_put_double, pn_data_copy, pn_data_put_int, pn_data_get_ubyte, pn_data_free, pn_data_clear, \
+ pn_data_get_double, pn_data_put_byte, pn_data_put_uuid, pn_data_put_ushort, pn_data_is_described, \
+ pn_data_get_float, pn_data_get_uint, pn_data_put_described, pn_data_get_decimal128, pn_data, \
+ pn_data_get_array_type, pn_data_put_map, pn_data_put_list, pn_data_get_string, pn_data_get_char, \
+ pn_data_put_decimal32, pn_data_enter, pn_data_put_short, pn_data_put_timestamp, \
+ pn_data_get_long, pn_data_get_map, pn_data_narrow, pn_data_put_array, pn_data_get_ushort, \
+ pn_data_get_int, pn_data_get_list, pn_data_get_ulong, pn_data_put_ubyte, \
+ pn_data_format, pn_data_dump, pn_data_get_uuid, pn_data_get_decimal32, \
+ pn_data_put_binary, pn_data_get_timestamp, pn_data_decode, pn_data_next, pn_data_put_null, pn_data_put_long, \
+ pn_error_text
+
+from ._common import Constant
+from ._exceptions import EXCEPTIONS, DataException
+
+from . import _compat
+
+#
+# Hacks to provide Python2 <---> Python3 compatibility
+#
+# The results are
+# | |long|unicode|
+# |python2|long|unicode|
+# |python3| int| str|
+try:
+ long()
+except NameError:
+ long = int
+try:
+ unicode()
+except NameError:
+ unicode = str
+
+
+class UnmappedType:
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __repr__(self):
+ return "UnmappedType(%s)" % self.msg
+
+
+class ulong(long):
+
+ def __repr__(self):
+ return "ulong(%s)" % long.__repr__(self)
+
+
+class timestamp(long):
+
+ def __repr__(self):
+ return "timestamp(%s)" % long.__repr__(self)
+
+
+class symbol(unicode):
+
+ def __repr__(self):
+ return "symbol(%s)" % unicode.__repr__(self)
+
+
+class char(unicode):
+
+ def __repr__(self):
+ return "char(%s)" % unicode.__repr__(self)
+
+
+class byte(int):
+
+ def __repr__(self):
+ return "byte(%s)" % int.__repr__(self)
+
+
+class short(int):
+
+ def __repr__(self):
+ return "short(%s)" % int.__repr__(self)
+
+
+class int32(int):
+
+ def __repr__(self):
+ return "int32(%s)" % int.__repr__(self)
+
+
+class ubyte(int):
+
+ def __repr__(self):
+ return "ubyte(%s)" % int.__repr__(self)
+
+
+class ushort(int):
+
+ def __repr__(self):
+ return "ushort(%s)" % int.__repr__(self)
+
+
+class uint(long):
+
+ def __repr__(self):
+ return "uint(%s)" % long.__repr__(self)
+
+
+class float32(float):
+
+ def __repr__(self):
+ return "float32(%s)" % float.__repr__(self)
+
+
+class decimal32(int):
+
+ def __repr__(self):
+ return "decimal32(%s)" % int.__repr__(self)
+
+
+class decimal64(long):
+
+ def __repr__(self):
+ return "decimal64(%s)" % long.__repr__(self)
+
+
+class decimal128(bytes):
+
+ def __repr__(self):
+ return "decimal128(%s)" % bytes.__repr__(self)
+
+
+class Described(object):
+
+ def __init__(self, descriptor, value):
+ self.descriptor = descriptor
+ self.value = value
+
+ def __repr__(self):
+ return "Described(%r, %r)" % (self.descriptor, self.value)
+
+ def __eq__(self, o):
+ if isinstance(o, Described):
+ return self.descriptor == o.descriptor and self.value == o.value
+ else:
+ return False
+
+
+UNDESCRIBED = Constant("UNDESCRIBED")
+
+
+class Array(object):
+
+ def __init__(self, descriptor, type, *elements):
+ self.descriptor = descriptor
+ self.type = type
+ self.elements = elements
+
+ def __iter__(self):
+ return iter(self.elements)
+
+ def __repr__(self):
+ if self.elements:
+ els = ", %s" % (", ".join(map(repr, self.elements)))
+ else:
+ els = ""
+ return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
+
+ def __eq__(self, o):
+ if isinstance(o, Array):
+ return self.descriptor == o.descriptor and \
+ self.type == o.type and self.elements == o.elements
+ else:
+ return False
+
+
+class Data:
+ """
+ The L{Data} class provides an interface for decoding, extracting,
+ creating, and encoding arbitrary AMQP data. A L{Data} object
+ contains a tree of AMQP values. Leaf nodes in this tree correspond
+ to scalars in the AMQP type system such as L{ints<INT>} or
+ L{strings<STRING>}. Non-leaf nodes in this tree correspond to
+ compound values in the AMQP type system such as L{lists<LIST>},
+ L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
+ The root node of the tree is the L{Data} object itself and can have
+ an arbitrary number of children.
+
+ A L{Data} object maintains the notion of the current sibling node
+ and a current parent node. Siblings are ordered within their parent.
+ Values are accessed and/or added by using the L{next}, L{prev},
+ L{enter}, and L{exit} methods to navigate to the desired location in
+ the tree and using the supplied variety of put_*/get_* methods to
+ access or add a value of the desired type.
+
+ The put_* methods will always add a value I{after} the current node
+ in the tree. If the current node has a next sibling the put_* method
+ will overwrite the value on this node. If there is no current node
+ or the current node has no next sibling then one will be added. The
+ put_* methods always set the added/modified node to the current
+ node. The get_* methods read the value of the current node and do
+ not change which node is current.
+
+ The following types of scalar values are supported:
+
+ - L{NULL}
+ - L{BOOL}
+ - L{UBYTE}
+ - L{USHORT}
+ - L{SHORT}
+ - L{UINT}
+ - L{INT}
+ - L{ULONG}
+ - L{LONG}
+ - L{FLOAT}
+ - L{DOUBLE}
+ - L{BINARY}
+ - L{STRING}
+ - L{SYMBOL}
+
+ The following types of compound values are supported:
+
+ - L{DESCRIBED}
+ - L{ARRAY}
+ - L{LIST}
+ - L{MAP}
+ """
+
+ NULL = PN_NULL; "A null value."
+ BOOL = PN_BOOL; "A boolean value."
+ UBYTE = PN_UBYTE; "An unsigned byte value."
+ BYTE = PN_BYTE; "A signed byte value."
+ USHORT = PN_USHORT; "An unsigned short value."
+ SHORT = PN_SHORT; "A short value."
+ UINT = PN_UINT; "An unsigned int value."
+ INT = PN_INT; "A signed int value."
+ CHAR = PN_CHAR; "A character value."
+ ULONG = PN_ULONG; "An unsigned long value."
+ LONG = PN_LONG; "A signed long value."
+ TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
+ FLOAT = PN_FLOAT; "A float value."
+ DOUBLE = PN_DOUBLE; "A double value."
+ DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
+ DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
+ DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
+ UUID = PN_UUID; "A UUID value."
+ BINARY = PN_BINARY; "A binary string."
+ STRING = PN_STRING; "A unicode string."
+ SYMBOL = PN_SYMBOL; "A symbolic string."
+ DESCRIBED = PN_DESCRIBED; "A described value."
+ ARRAY = PN_ARRAY; "An array value."
+ LIST = PN_LIST; "A list value."
+ MAP = PN_MAP; "A map value."
+
+ type_names = {
+ NULL: "null",
+ BOOL: "bool",
+ BYTE: "byte",
+ UBYTE: "ubyte",
+ SHORT: "short",
+ USHORT: "ushort",
+ INT: "int",
+ UINT: "uint",
+ CHAR: "char",
+ LONG: "long",
+ ULONG: "ulong",
+ TIMESTAMP: "timestamp",
+ FLOAT: "float",
+ DOUBLE: "double",
+ DECIMAL32: "decimal32",
+ DECIMAL64: "decimal64",
+ DECIMAL128: "decimal128",
+ UUID: "uuid",
+ BINARY: "binary",
+ STRING: "string",
+ SYMBOL: "symbol",
+ DESCRIBED: "described",
+ ARRAY: "array",
+ LIST: "list",
+ MAP: "map"
+ }
+
+ @classmethod
+ def type_name(type):
+ return Data.type_names[type]
+
+ def __init__(self, capacity=16):
+ if isinstance(capacity, (int, long)):
+ self._data = pn_data(capacity)
+ self._free = True
+ else:
+ self._data = capacity
+ self._free = False
+
+ def __del__(self):
+ if self._free and hasattr(self, "_data"):
+ pn_data_free(self._data)
+ del self._data
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, DataException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
+ else:
+ return err
+
+ def clear(self):
+ """
+ Clears the data object.
+ """
+ pn_data_clear(self._data)
+
+ def rewind(self):
+ """
+ Clears current node and sets the parent to the root node. Clearing the
+ current node sets it _before_ the first node, calling next() will advance to
+ the first node.
+ """
+ assert self._data is not None
+ pn_data_rewind(self._data)
+
+ def next(self):
+ """
+ Advances the current node to its next sibling and returns its
+ type. If there is no next sibling the current node remains
+ unchanged and None is returned.
+ """
+ found = pn_data_next(self._data)
+ if found:
+ return self.type()
+ else:
+ return None
+
+ def prev(self):
+ """
+ Advances the current node to its previous sibling and returns its
+ type. If there is no previous sibling the current node remains
+ unchanged and None is returned.
+ """
+ found = pn_data_prev(self._data)
+ if found:
+ return self.type()
+ else:
+ return None
+
+ def enter(self):
+ """
+ Sets the parent node to the current node and clears the current node.
+ Clearing the current node sets it _before_ the first child,
+ call next() advances to the first child.
+ """
+ return pn_data_enter(self._data)
+
+ def exit(self):
+ """
+ Sets the current node to the parent node and the parent node to
+ its own parent.
+ """
+ return pn_data_exit(self._data)
+
+ def lookup(self, name):
+ return pn_data_lookup(self._data, name)
+
+ def narrow(self):
+ pn_data_narrow(self._data)
+
+ def widen(self):
+ pn_data_widen(self._data)
+
+ def type(self):
+ """
+ Returns the type of the current node.
+ """
+ dtype = pn_data_type(self._data)
+ if dtype == -1:
+ return None
+ else:
+ return dtype
+
+ def encoded_size(self):
+ """
+ Returns the size in bytes needed to encode the data in AMQP format.
+ """
+ return pn_data_encoded_size(self._data)
+
+ def encode(self):
+ """
+ Returns a representation of the data encoded in AMQP format.
+ """
+ size = 1024
+ while True:
+ cd, enc = pn_data_encode(self._data, size)
+ if cd == PN_OVERFLOW:
+ size *= 2
+ elif cd >= 0:
+ return enc
+ else:
+ self._check(cd)
+
+ def decode(self, encoded):
+ """
+ Decodes the first value from supplied AMQP data and returns the
+ number of bytes consumed.
+
+ @type encoded: binary
+ @param encoded: AMQP encoded binary data
+ """
+ return self._check(pn_data_decode(self._data, encoded))
+
+ def put_list(self):
+ """
+ Puts a list value. Elements may be filled by entering the list
+ node and putting element values.
+
+ >>> data = Data()
+ >>> data.put_list()
+ >>> data.enter()
+ >>> data.put_int(1)
+ >>> data.put_int(2)
+ >>> data.put_int(3)
+ >>> data.exit()
+ """
+ self._check(pn_data_put_list(self._data))
+
+ def put_map(self):
+ """
+ Puts a map value. Elements may be filled by entering the map node
+ and putting alternating key value pairs.
+
+ >>> data = Data()
+ >>> data.put_map()
+ >>> data.enter()
+ >>> data.put_string("key")
+ >>> data.put_string("value")
+ >>> data.exit()
+ """
+ self._check(pn_data_put_map(self._data))
+
+ def put_array(self, described, element_type):
+ """
+ Puts an array value. Elements may be filled by entering the array
+ node and putting the element values. The values must all be of the
+ specified array element type. If an array is described then the
+ first child value of the array is the descriptor and may be of any
+ type.
+
+ >>> data = Data()
+ >>>
+ >>> data.put_array(False, Data.INT)
+ >>> data.enter()
+ >>> data.put_int(1)
+ >>> data.put_int(2)
+ >>> data.put_int(3)
+ >>> data.exit()
+ >>>
+ >>> data.put_array(True, Data.DOUBLE)
+ >>> data.enter()
+ >>> data.put_symbol("array-descriptor")
+ >>> data.put_double(1.1)
+ >>> data.put_double(1.2)
+ >>> data.put_double(1.3)
+ >>> data.exit()
+
+ @type described: bool
+ @param described: specifies whether the array is described
+ @type element_type: int
+ @param element_type: the type of the array elements
+ """
+ self._check(pn_data_put_array(self._data, described, element_type))
+
+ def put_described(self):
+ """
+ Puts a described value. A described node has two children, the
+ descriptor and the value. These are specified by entering the node
+ and putting the desired values.
+
+ >>> data = Data()
+ >>> data.put_described()
+ >>> data.enter()
+ >>> data.put_symbol("value-descriptor")
+ >>> data.put_string("the value")
+ >>> data.exit()
+ """
+ self._check(pn_data_put_described(self._data))
+
+ def put_null(self):
+ """
+ Puts a null value.
+ """
+ self._check(pn_data_put_null(self._data))
+
+ def put_bool(self, b):
+ """
+ Puts a boolean value.
+
+ @param b: a boolean value
+ """
+ self._check(pn_data_put_bool(self._data, b))
+
+ def put_ubyte(self, ub):
+ """
+ Puts an unsigned byte value.
+
+ @param ub: an integral value
+ """
+ self._check(pn_data_put_ubyte(self._data, ub))
+
+ def put_byte(self, b):
+ """
+ Puts a signed byte value.
+
+ @param b: an integral value
+ """
+ self._check(pn_data_put_byte(self._data, b))
+
+ def put_ushort(self, us):
+ """
+ Puts an unsigned short value.
+
+ @param us: an integral value.
+ """
+ self._check(pn_data_put_ushort(self._data, us))
+
+ def put_short(self, s):
+ """
+ Puts a signed short value.
+
+ @param s: an integral value
+ """
+ self._check(pn_data_put_short(self._data, s))
+
+ def put_uint(self, ui):
+ """
+ Puts an unsigned int value.
+
+ @param ui: an integral value
+ """
+ self._check(pn_data_put_uint(self._data, ui))
+
+ def put_int(self, i):
+ """
+ Puts a signed int value.
+
+ @param i: an integral value
+ """
+ self._check(pn_data_put_int(self._data, i))
+
+ def put_char(self, c):
+ """
+ Puts a char value.
+
+ @param c: a single character
+ """
+ self._check(pn_data_put_char(self._data, ord(c)))
+
+ def put_ulong(self, ul):
+ """
+ Puts an unsigned long value.
+
+ @param ul: an integral value
+ """
+ self._check(pn_data_put_ulong(self._data, ul))
+
+ def put_long(self, l):
+ """
+ Puts a signed long value.
+
+ @param l: an integral value
+ """
+ self._check(pn_data_put_long(self._data, l))
+
+ def put_timestamp(self, t):
+ """
+ Puts a timestamp value.
+
+ @param t: an integral value
+ """
+ self._check(pn_data_put_timestamp(self._data, t))
+
+ def put_float(self, f):
+ """
+ Puts a float value.
+
+ @param f: a floating point value
+ """
+ self._check(pn_data_put_float(self._data, f))
+
+ def put_double(self, d):
+ """
+ Puts a double value.
+
+ @param d: a floating point value.
+ """
+ self._check(pn_data_put_double(self._data, d))
+
+ def put_decimal32(self, d):
+ """
+ Puts a decimal32 value.
+
+ @param d: a decimal32 value
+ """
+ self._check(pn_data_put_decimal32(self._data, d))
+
+ def put_decimal64(self, d):
+ """
+ Puts a decimal64 value.
+
+ @param d: a decimal64 value
+ """
+ self._check(pn_data_put_decimal64(self._data, d))
+
+ def put_decimal128(self, d):
+ """
+ Puts a decimal128 value.
+
+ @param d: a decimal128 value
+ """
+ self._check(pn_data_put_decimal128(self._data, d))
+
+ def put_uuid(self, u):
+ """
+ Puts a UUID value.
+
+ @param u: a uuid value
+ """
+ self._check(pn_data_put_uuid(self._data, u.bytes))
+
+ def put_binary(self, b):
+ """
+ Puts a binary value.
+
+ @type b: binary
+ @param b: a binary value
+ """
+ self._check(pn_data_put_binary(self._data, b))
+
+ def put_memoryview(self, mv):
+ """Put a python memoryview object as an AMQP binary value"""
+ self.put_binary(mv.tobytes())
+
+ def put_buffer(self, buff):
+ """Put a python buffer object as an AMQP binary value"""
+ self.put_binary(bytes(buff))
+
+ def put_string(self, s):
+ """
+ Puts a unicode value.
+
+ @type s: unicode
+ @param s: a unicode value
+ """
+ self._check(pn_data_put_string(self._data, s.encode("utf8")))
+
+ def put_symbol(self, s):
+ """
+ Puts a symbolic value.
+
+ @type s: string
+ @param s: the symbol name
+ """
+ self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
+
+ def get_list(self):
+ """
+ If the current node is a list, return the number of elements,
+ otherwise return zero. List elements can be accessed by entering
+ the list.
+
+ >>> count = data.get_list()
+ >>> data.enter()
+ >>> for i in range(count):
+ ... type = data.next()
+ ... if type == Data.STRING:
+ ... print data.get_string()
+ ... elif type == ...:
+ ... ...
+ >>> data.exit()
+ """
+ return pn_data_get_list(self._data)
+
+ def get_map(self):
+ """
+ If the current node is a map, return the number of child elements,
+ otherwise return zero. Key value pairs can be accessed by entering
+ the map.
+
+ >>> count = data.get_map()
+ >>> data.enter()
+ >>> for i in range(count/2):
+ ... type = data.next()
+ ... if type == Data.STRING:
+ ... print data.get_string()
+ ... elif type == ...:
+ ... ...
+ >>> data.exit()
+ """
+ return pn_data_get_map(self._data)
+
+ def get_array(self):
+ """
+ If the current node is an array, return a tuple of the element
+ count, a boolean indicating whether the array is described, and
+ the type of each element, otherwise return (0, False, None). Array
+ data can be accessed by entering the array.
+
+ >>> # read an array of strings with a symbolic descriptor
+ >>> count, described, type = data.get_array()
+ >>> data.enter()
+ >>> data.next()
+ >>> print "Descriptor:", data.get_symbol()
+ >>> for i in range(count):
+ ... data.next()
+ ... print "Element:", data.get_string()
+ >>> data.exit()
+ """
+ count = pn_data_get_array(self._data)
+ described = pn_data_is_array_described(self._data)
+ type = pn_data_get_array_type(self._data)
+ if type == -1:
+ type = None
+ return count, described, type
+
+ def is_described(self):
+ """
+ Checks if the current node is a described value. The descriptor
+ and value may be accessed by entering the described value.
+
+ >>> # read a symbolically described string
+ >>> assert data.is_described() # will error if the current node is not described
+ >>> data.enter()
+ >>> data.next()
+ >>> print data.get_symbol()
+ >>> data.next()
+ >>> print data.get_string()
+ >>> data.exit()
+ """
+ return pn_data_is_described(self._data)
+
+ def is_null(self):
+ """
+ Checks if the current node is a null.
+ """
+ return pn_data_is_null(self._data)
+
+ def get_bool(self):
+ """
+ If the current node is a boolean, returns its value, returns False
+ otherwise.
+ """
+ return pn_data_get_bool(self._data)
+
+ def get_ubyte(self):
+ """
+ If the current node is an unsigned byte, returns its value,
+ returns 0 otherwise.
+ """
+ return ubyte(pn_data_get_ubyte(self._data))
+
+ def get_byte(self):
+ """
+ If the current node is a signed byte, returns its value, returns 0
+ otherwise.
+ """
+ return byte(pn_data_get_byte(self._data))
+
+ def get_ushort(self):
+ """
+ If the current node is an unsigned short, returns its value,
+ returns 0 otherwise.
+ """
+ return ushort(pn_data_get_ushort(self._data))
+
+ def get_short(self):
+ """
+ If the current node is a signed short, returns its value, returns
+ 0 otherwise.
+ """
+ return short(pn_data_get_short(self._data))
+
+ def get_uint(self):
+ """
+ If the current node is an unsigned int, returns its value, returns
+ 0 otherwise.
+ """
+ return uint(pn_data_get_uint(self._data))
+
+ def get_int(self):
+ """
+ If the current node is a signed int, returns its value, returns 0
+ otherwise.
+ """
+ return int32(pn_data_get_int(self._data))
+
+ def get_char(self):
+ """
+ If the current node is a char, returns its value, returns 0
+ otherwise.
+ """
+ return char(_compat.unichr(pn_data_get_char(self._data)))
+
+ def get_ulong(self):
+ """
+ If the current node is an unsigned long, returns its value,
+ returns 0 otherwise.
+ """
+ return ulong(pn_data_get_ulong(self._data))
+
+ def get_long(self):
+ """
+ If the current node is an signed long, returns its value, returns
+ 0 otherwise.
+ """
+ return long(pn_data_get_long(self._data))
+
+ def get_timestamp(self):
+ """
+ If the current node is a timestamp, returns its value, returns 0
+ otherwise.
+ """
+ return timestamp(pn_data_get_timestamp(self._data))
+
+ def get_float(self):
+ """
+ If the current node is a float, returns its value, raises 0
+ otherwise.
+ """
+ return float32(pn_data_get_float(self._data))
+
+ def get_double(self):
+ """
+ If the current node is a double, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_double(self._data)
+
+ # XXX: need to convert
+ def get_decimal32(self):
+ """
+ If the current node is a decimal32, returns its value, returns 0
+ otherwise.
+ """
+ return decimal32(pn_data_get_decimal32(self._data))
+
+ # XXX: need to convert
+ def get_decimal64(self):
+ """
+ If the current node is a decimal64, returns its value, returns 0
+ otherwise.
+ """
+ return decimal64(pn_data_get_decimal64(self._data))
+
+ # XXX: need to convert
+ def get_decimal128(self):
+ """
+ If the current node is a decimal128, returns its value, returns 0
+ otherwise.
+ """
+ return decimal128(pn_data_get_decimal128(self._data))
+
+ def get_uuid(self):
+ """
+ If the current node is a UUID, returns its value, returns None
+ otherwise.
+ """
+ if pn_data_type(self._data) == Data.UUID:
+ return uuid.UUID(bytes=pn_data_get_uuid(self._data))
+ else:
+ return None
+
+ def get_binary(self):
+ """
+ If the current node is binary, returns its value, returns ""
+ otherwise.
+ """
+ return pn_data_get_binary(self._data)
+
+ def get_string(self):
+ """
+ If the current node is a string, returns its value, returns ""
+ otherwise.
+ """
+ return pn_data_get_string(self._data).decode("utf8")
+
+ def get_symbol(self):
+ """
+ If the current node is a symbol, returns its value, returns ""
+ otherwise.
+ """
+ return symbol(pn_data_get_symbol(self._data).decode('ascii'))
+
+ def copy(self, src):
+ self._check(pn_data_copy(self._data, src._data))
+
+ def format(self):
+ sz = 16
+ while True:
+ err, result = pn_data_format(self._data, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return result
+
+ def dump(self):
+ pn_data_dump(self._data)
+
+ def put_dict(self, d):
+ self.put_map()
+ self.enter()
+ try:
+ for k, v in d.items():
+ self.put_object(k)
+ self.put_object(v)
+ finally:
+ self.exit()
+
+ def get_dict(self):
+ if self.enter():
+ try:
+ result = {}
+ while self.next():
+ k = self.get_object()
+ if self.next():
+ v = self.get_object()
+ else:
+ v = None
+ result[k] = v
+ finally:
+ self.exit()
+ return result
+
+ def put_sequence(self, s):
+ self.put_list()
+ self.enter()
+ try:
+ for o in s:
+ self.put_object(o)
+ finally:
+ self.exit()
+
+ def get_sequence(self):
+ if self.enter():
+ try:
+ result = []
+ while self.next():
+ result.append(self.get_object())
+ finally:
+ self.exit()
+ return result
+
+ def get_py_described(self):
+ if self.enter():
+ try:
+ self.next()
+ descriptor = self.get_object()
+ self.next()
+ value = self.get_object()
+ finally:
+ self.exit()
+ return Described(descriptor, value)
+
+ def put_py_described(self, d):
+ self.put_described()
+ self.enter()
+ try:
+ self.put_object(d.descriptor)
+ self.put_object(d.value)
+ finally:
+ self.exit()
+
+ def get_py_array(self):
+ """
+ If the current node is an array, return an Array object
+ representing the array and its contents. Otherwise return None.
+ This is a convenience wrapper around get_array, enter, etc.
+ """
+
+ count, described, type = self.get_array()
+ if type is None: return None
+ if self.enter():
+ try:
+ if described:
+ self.next()
+ descriptor = self.get_object()
+ else:
+ descriptor = UNDESCRIBED
+ elements = []
+ while self.next():
+ elements.append(self.get_object())
+ finally:
+ self.exit()
+ return Array(descriptor, type, *elements)
+
+ def put_py_array(self, a):
+ described = a.descriptor != UNDESCRIBED
+ self.put_array(described, a.type)
+ self.enter()
+ try:
+ if described:
+ self.put_object(a.descriptor)
+ for e in a.elements:
+ self.put_object(e)
+ finally:
+ self.exit()
+
+ put_mappings = {
+ None.__class__: lambda s, _: s.put_null(),
+ bool: put_bool,
+ ubyte: put_ubyte,
+ ushort: put_ushort,
+ uint: put_uint,
+ ulong: put_ulong,
+ byte: put_byte,
+ short: put_short,
+ int32: put_int,
+ long: put_long,
+ float32: put_float,
+ float: put_double,
+ decimal32: put_decimal32,
+ decimal64: put_decimal64,
+ decimal128: put_decimal128,
+ char: put_char,
+ timestamp: put_timestamp,
+ uuid.UUID: put_uuid,
+ bytes: put_binary,
+ unicode: put_string,
+ symbol: put_symbol,
+ list: put_sequence,
+ tuple: put_sequence,
+ dict: put_dict,
+ Described: put_py_described,
+ Array: put_py_array
+ }
+ # for python 3.x, long is merely an alias for int, but for python 2.x
+ # we need to add an explicit int since it is a different type
+ if int not in put_mappings:
+ put_mappings[int] = put_int
+ # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both.
+ try:
+ put_mappings[memoryview] = put_memoryview
+ except NameError:
+ pass
+ try:
+ put_mappings[buffer] = put_buffer
+ except NameError:
+ pass
+ get_mappings = {
+ NULL: lambda s: None,
+ BOOL: get_bool,
+ BYTE: get_byte,
+ UBYTE: get_ubyte,
+ SHORT: get_short,
+ USHORT: get_ushort,
+ INT: get_int,
+ UINT: get_uint,
+ CHAR: get_char,
+ LONG: get_long,
+ ULONG: get_ulong,
+ TIMESTAMP: get_timestamp,
+ FLOAT: get_float,
+ DOUBLE: get_double,
+ DECIMAL32: get_decimal32,
+ DECIMAL64: get_decimal64,
+ DECIMAL128: get_decimal128,
+ UUID: get_uuid,
+ BINARY: get_binary,
+ STRING: get_string,
+ SYMBOL: get_symbol,
+ DESCRIBED: get_py_described,
+ ARRAY: get_py_array,
+ LIST: get_sequence,
+ MAP: get_dict
+ }
+
+ def put_object(self, obj):
+ putter = self.put_mappings[obj.__class__]
+ putter(self, obj)
+
+ def get_object(self):
+ type = self.type()
+ if type is None: return None
+ getter = self.get_mappings.get(type)
+ if getter:
+ return getter(self)
+ else:
+ return UnmappedType(str(type))
+
+
+def dat2obj(dimpl):
+ if dimpl:
+ d = Data(dimpl)
+ d.rewind()
+ d.next()
+ obj = d.get_object()
+ d.rewind()
+ return obj
+
+
+def obj2dat(obj, dimpl):
+ if obj is not None:
+ d = Data(dimpl)
+ d.put_object(obj)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_delivery.py
----------------------------------------------------------------------
diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py
new file mode 100644
index 0000000..e609451
--- /dev/null
+++ b/python/proton/_delivery.py
@@ -0,0 +1,293 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_REJECTED, PN_RELEASED, PN_MODIFIED, PN_RECEIVED, PN_ACCEPTED, \
+ pn_disposition_is_undeliverable, pn_disposition_set_section_number, pn_disposition_get_section_number, \
+ pn_disposition_set_undeliverable, pn_disposition_set_failed, pn_disposition_condition, \
+ pn_disposition_set_section_offset, pn_disposition_data, pn_disposition_get_section_offset, \
+ pn_disposition_is_failed, pn_disposition_annotations, \
+ pn_delivery_partial, pn_delivery_aborted, pn_disposition_type, pn_delivery_pending, pn_delivery_updated, \
+ pn_delivery_readable, pn_delivery_abort, pn_delivery_remote, pn_delivery_tag, pn_delivery_link, pn_delivery_local, \
+ pn_delivery_update, pn_delivery_attachments, pn_delivery_local_state, pn_delivery_settled, pn_delivery_settle, \
+ pn_delivery_writable, pn_delivery_remote_state, \
+ pn_work_next
+
+from ._condition import cond2obj, obj2cond
+from ._data import dat2obj, obj2dat
+from ._wrapper import Wrapper
+
+
+class NamedInt(int):
+ values = {} # type: Dict[int, str]
+
+ def __new__(cls, i, name):
+ ni = super(NamedInt, cls).__new__(cls, i)
+ cls.values[i] = ni
+ return ni
+
+ def __init__(self, i, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.name
+
+ def __str__(self):
+ return self.name
+
+ @classmethod
+ def get(cls, i):
+ return cls.values.get(i, i)
+
+
+class DispositionType(NamedInt):
+ values = {}
+
+
+class Disposition(object):
+ RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
+ ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
+ REJECTED = DispositionType(PN_REJECTED, "REJECTED")
+ RELEASED = DispositionType(PN_RELEASED, "RELEASED")
+ MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
+
+ def __init__(self, impl, local):
+ self._impl = impl
+ self.local = local
+ self._data = None
+ self._condition = None
+ self._annotations = None
+
+ @property
+ def type(self):
+ return DispositionType.get(pn_disposition_type(self._impl))
+
+ def _get_section_number(self):
+ return pn_disposition_get_section_number(self._impl)
+
+ def _set_section_number(self, n):
+ pn_disposition_set_section_number(self._impl, n)
+
+ section_number = property(_get_section_number, _set_section_number)
+
+ def _get_section_offset(self):
+ return pn_disposition_get_section_offset(self._impl)
+
+ def _set_section_offset(self, n):
+ pn_disposition_set_section_offset(self._impl, n)
+
+ section_offset = property(_get_section_offset, _set_section_offset)
+
+ def _get_failed(self):
+ return pn_disposition_is_failed(self._impl)
+
+ def _set_failed(self, b):
+ pn_disposition_set_failed(self._impl, b)
+
+ failed = property(_get_failed, _set_failed)
+
+ def _get_undeliverable(self):
+ return pn_disposition_is_undeliverable(self._impl)
+
+ def _set_undeliverable(self, b):
+ pn_disposition_set_undeliverable(self._impl, b)
+
+ undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+ def _get_data(self):
+ if self.local:
+ return self._data
+ else:
+ return dat2obj(pn_disposition_data(self._impl))
+
+ def _set_data(self, obj):
+ if self.local:
+ self._data = obj
+ else:
+ raise AttributeError("data attribute is read-only")
+
+ data = property(_get_data, _set_data)
+
+ def _get_annotations(self):
+ if self.local:
+ return self._annotations
+ else:
+ return dat2obj(pn_disposition_annotations(self._impl))
+
+ def _set_annotations(self, obj):
+ if self.local:
+ self._annotations = obj
+ else:
+ raise AttributeError("annotations attribute is read-only")
+
+ annotations = property(_get_annotations, _set_annotations)
+
+ def _get_condition(self):
+ if self.local:
+ return self._condition
+ else:
+ return cond2obj(pn_disposition_condition(self._impl))
+
+ def _set_condition(self, obj):
+ if self.local:
+ self._condition = obj
+ else:
+ raise AttributeError("condition attribute is read-only")
+
+ condition = property(_get_condition, _set_condition)
+
+
+class Delivery(Wrapper):
+ """
+ Tracks and/or records the delivery of a message over a link.
+ """
+
+ RECEIVED = Disposition.RECEIVED
+ ACCEPTED = Disposition.ACCEPTED
+ REJECTED = Disposition.REJECTED
+ RELEASED = Disposition.RELEASED
+ MODIFIED = Disposition.MODIFIED
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Delivery(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_delivery_attachments)
+
+ def _init(self):
+ self.local = Disposition(pn_delivery_local(self._impl), True)
+ self.remote = Disposition(pn_delivery_remote(self._impl), False)
+
+ @property
+ def tag(self):
+ """The identifier for the delivery."""
+ return pn_delivery_tag(self._impl)
+
+ @property
+ def writable(self):
+ """Returns true for an outgoing delivery to which data can now be written."""
+ return pn_delivery_writable(self._impl)
+
+ @property
+ def readable(self):
+ """Returns true for an incoming delivery that has data to read."""
+ return pn_delivery_readable(self._impl)
+
+ @property
+ def updated(self):
+ """Returns true if the state of the delivery has been updated
+ (e.g. it has been settled and/or accepted, rejected etc)."""
+ return pn_delivery_updated(self._impl)
+
+ def update(self, state):
+ """
+ Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
+ """
+ obj2dat(self.local._data, pn_disposition_data(self.local._impl))
+ obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
+ obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
+ pn_delivery_update(self._impl, state)
+
+ @property
+ def pending(self):
+ return pn_delivery_pending(self._impl)
+
+ @property
+ def partial(self):
+ """
+ Returns true for an incoming delivery if not all the data is
+ yet available.
+ """
+ return pn_delivery_partial(self._impl)
+
+ @property
+ def local_state(self):
+ """Returns the local state of the delivery."""
+ return DispositionType.get(pn_delivery_local_state(self._impl))
+
+ @property
+ def remote_state(self):
+ """
+ Returns the state of the delivery as indicated by the remote
+ peer.
+ """
+ return DispositionType.get(pn_delivery_remote_state(self._impl))
+
+ @property
+ def settled(self):
+ """
+ Returns true if the delivery has been settled by the remote peer.
+ """
+ return pn_delivery_settled(self._impl)
+
+ def settle(self):
+ """
+ Settles the delivery locally. This indicates the application
+ considers the delivery complete and does not wish to receive any
+ further events about it. Every delivery should be settled locally.
+ """
+ pn_delivery_settle(self._impl)
+
+ @property
+ def aborted(self):
+ """Returns true if the delivery has been aborted."""
+ return pn_delivery_aborted(self._impl)
+
+ def abort(self):
+ """
+ Aborts the delivery. This indicates the application wishes to
+ invalidate any data that may have already been sent on this delivery.
+ The delivery cannot be aborted after it has been completely delivered.
+ """
+ pn_delivery_abort(self._impl)
+
+ @property
+ def work_next(self):
+ return Delivery.wrap(pn_work_next(self._impl))
+
+ @property
+ def link(self):
+ """
+ Returns the link on which the delivery was sent or received.
+ """
+ from . import _endpoints
+ return _endpoints.Link.wrap(pn_delivery_link(self._impl))
+
+ @property
+ def session(self):
+ """
+ Returns the session over which the delivery was sent or received.
+ """
+ return self.link.session
+
+ @property
+ def connection(self):
+ """
+ Returns the connection over which the delivery was sent or received.
+ """
+ return self.session.connection
+
+ @property
+ def transport(self):
+ return self.connection.transport
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_endpoints.py
----------------------------------------------------------------------
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
new file mode 100644
index 0000000..bfa9880
--- /dev/null
+++ b/python/proton/_endpoints.py
@@ -0,0 +1,765 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The proton.endpoints module
+"""
+
+from __future__ import absolute_import
+
+import weakref
+
+from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \
+ PN_REMOTE_CLOSED, \
+ pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \
+ pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \
+ pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \
+ pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \
+ pn_connection_set_password, pn_connection_remote_container, pn_connection_remote_hostname, \
+ pn_connection_remote_offered_capabilities, pn_connection_remote_desired_capabilities, \
+ pn_connection_remote_properties, pn_connection_offered_capabilities, pn_connection_desired_capabilities, \
+ pn_connection_properties, pn_connection_open, pn_connection_close, pn_connection_state, pn_connection_release, \
+ pn_session, pn_session_head, pn_session_attachments, pn_session_condition, pn_session_remote_condition, \
+ pn_session_get_incoming_capacity, pn_session_set_incoming_capacity, pn_session_get_outgoing_window, \
+ pn_session_set_outgoing_window, pn_session_incoming_bytes, pn_session_outgoing_bytes, pn_session_open, \
+ pn_session_close, pn_session_next, pn_session_state, pn_session_connection, pn_session_free, \
+ PN_SND_UNSETTLED, PN_SND_SETTLED, PN_SND_MIXED, PN_RCV_FIRST, PN_RCV_SECOND, \
+ pn_link_head, pn_link_is_sender, pn_link_attachments, pn_link_error, pn_link_condition, pn_link_remote_condition, \
+ pn_link_open, pn_link_close, pn_link_state, pn_link_source, pn_link_target, pn_link_remote_source, \
+ pn_link_remote_target, pn_link_session, pn_link_current, pn_link_advance, pn_link_unsettled, pn_link_credit, \
+ pn_link_available, pn_link_queued, pn_link_next, pn_link_name, pn_link_is_receiver, pn_link_remote_snd_settle_mode, \
+ pn_link_remote_rcv_settle_mode, pn_link_snd_settle_mode, pn_link_set_snd_settle_mode, pn_link_rcv_settle_mode, \
+ pn_link_set_rcv_settle_mode, pn_link_get_drain, pn_link_set_drain, pn_link_drained, pn_link_remote_max_message_size, \
+ pn_link_max_message_size, pn_link_set_max_message_size, pn_link_detach, pn_link_free, pn_link_offered, pn_link_send, \
+ pn_link_flow, pn_link_recv, pn_link_drain, pn_link_draining, \
+ pn_sender, pn_receiver, \
+ PN_UNSPECIFIED, PN_SOURCE, PN_TARGET, PN_COORDINATOR, PN_NONDURABLE, PN_CONFIGURATION, \
+ PN_DELIVERIES, PN_DIST_MODE_UNSPECIFIED, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, PN_EXPIRE_WITH_LINK, \
+ PN_EXPIRE_WITH_SESSION, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_NEVER, \
+ pn_terminus_set_durability, pn_terminus_set_timeout, pn_terminus_set_dynamic, pn_terminus_get_type, \
+ pn_terminus_get_durability, pn_terminus_set_type, pn_terminus_get_address, pn_terminus_capabilities, \
+ pn_terminus_set_address, pn_terminus_get_timeout, pn_terminus_filter, pn_terminus_properties, \
+ pn_terminus_get_expiry_policy, pn_terminus_set_expiry_policy, pn_terminus_set_distribution_mode, \
+ pn_terminus_get_distribution_mode, pn_terminus_copy, pn_terminus_outcomes, pn_terminus_is_dynamic, \
+ PN_EOS, \
+ pn_delivery, \
+ pn_work_head, \
+ pn_error_code, pn_error_text
+
+from ._common import utf82unicode, unicode2utf8
+from ._condition import obj2cond, cond2obj
+from ._data import Data, obj2dat, dat2obj
+from ._delivery import Delivery
+from ._exceptions import EXCEPTIONS, LinkException, SessionException, ConnectionException
+from ._transport import Transport
+from ._wrapper import Wrapper
+
+
+class Endpoint(object):
+ LOCAL_UNINIT = PN_LOCAL_UNINIT
+ REMOTE_UNINIT = PN_REMOTE_UNINIT
+ LOCAL_ACTIVE = PN_LOCAL_ACTIVE
+ REMOTE_ACTIVE = PN_REMOTE_ACTIVE
+ LOCAL_CLOSED = PN_LOCAL_CLOSED
+ REMOTE_CLOSED = PN_REMOTE_CLOSED
+
+ def _init(self):
+ self.condition = None
+
+ def _update_cond(self):
+ obj2cond(self.condition, self._get_cond_impl())
+
+ @property
+ def remote_condition(self):
+ return cond2obj(self._get_remote_cond_impl())
+
+ # the following must be provided by subclasses
+ def _get_cond_impl(self):
+ assert False, "Subclass must override this!"
+
+ def _get_remote_cond_impl(self):
+ assert False, "Subclass must override this!"
+
+ def _get_handler(self):
+ from . import reactor
+ from . import _reactor_impl
+ ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+ if ractor:
+ on_error = ractor.on_error_delegate()
+ else:
+ on_error = None
+ record = self._get_attachments()
+ return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
+
+ def _set_handler(self, handler):
+ from . import reactor
+ from . import _reactor_impl
+ ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl))
+ if ractor:
+ on_error = ractor.on_error_delegate()
+ else:
+ on_error = None
+ impl = _reactor_impl._chandler(handler, on_error)
+ record = self._get_attachments()
+ pn_record_set_handler(record, impl)
+ pn_decref(impl)
+
+ handler = property(_get_handler, _set_handler)
+
+ @property
+ def transport(self):
+ return self.connection.transport
+
+
+class Connection(Wrapper, Endpoint):
+ """
+ A representation of an AMQP connection
+ """
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Connection(impl)
+
+ def __init__(self, impl=pn_connection):
+ Wrapper.__init__(self, impl, pn_connection_attachments)
+
+ def _init(self):
+ Endpoint._init(self)
+ self.offered_capabilities = None
+ self.desired_capabilities = None
+ self.properties = None
+
+ def _get_attachments(self):
+ return pn_connection_attachments(self._impl)
+
+ @property
+ def connection(self):
+ return self
+
+ @property
+ def transport(self):
+ return Transport.wrap(pn_connection_transport(self._impl))
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, ConnectionException)
+ raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
+ else:
+ return err
+
+ def _get_cond_impl(self):
+ return pn_connection_condition(self._impl)
+
+ def _get_remote_cond_impl(self):
+ return pn_connection_remote_condition(self._impl)
+
+ def collect(self, collector):
+ if collector is None:
+ pn_connection_collect(self._impl, None)
+ else:
+ pn_connection_collect(self._impl, collector._impl)
+ self._collector = weakref.ref(collector)
+
+ def _get_container(self):
+ return utf82unicode(pn_connection_get_container(self._impl))
+
+ def _set_container(self, name):
+ return pn_connection_set_container(self._impl, unicode2utf8(name))
+
+ container = property(_get_container, _set_container)
+
+ def _get_hostname(self):
+ return utf82unicode(pn_connection_get_hostname(self._impl))
+
+ def _set_hostname(self, name):
+ return pn_connection_set_hostname(self._impl, unicode2utf8(name))
+
+ hostname = property(_get_hostname, _set_hostname,
+ doc="""
+Set the name of the host (either fully qualified or relative) to which this
+connection is connecting to. This information may be used by the remote
+peer to determine the correct back-end service to connect the client to.
+This value will be sent in the Open performative, and will be used by SSL
+and SASL layers to identify the peer.
+""")
+
+ def _get_user(self):
+ return utf82unicode(pn_connection_get_user(self._impl))
+
+ def _set_user(self, name):
+ return pn_connection_set_user(self._impl, unicode2utf8(name))
+
+ user = property(_get_user, _set_user)
+
+ def _get_password(self):
+ return None
+
+ def _set_password(self, name):
+ return pn_connection_set_password(self._impl, unicode2utf8(name))
+
+ password = property(_get_password, _set_password)
+
+ @property
+ def remote_container(self):
+ """The container identifier specified by the remote peer for this connection."""
+ return pn_connection_remote_container(self._impl)
+
+ @property
+ def remote_hostname(self):
+ """The hostname specified by the remote peer for this connection."""
+ return pn_connection_remote_hostname(self._impl)
+
+ @property
+ def remote_offered_capabilities(self):
+ """The capabilities offered by the remote peer for this connection."""
+ return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
+
+ @property
+ def remote_desired_capabilities(self):
+ """The capabilities desired by the remote peer for this connection."""
+ return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
+
+ @property
+ def remote_properties(self):
+ """The properties specified by the remote peer for this connection."""
+ return dat2obj(pn_connection_remote_properties(self._impl))
+
+ def open(self):
+ """
+ Opens the connection.
+
+ In more detail, this moves the local state of the connection to
+ the ACTIVE state and triggers an open frame to be sent to the
+ peer. A connection is fully active once both peers have opened it.
+ """
+ obj2dat(self.offered_capabilities,
+ pn_connection_offered_capabilities(self._impl))
+ obj2dat(self.desired_capabilities,
+ pn_connection_desired_capabilities(self._impl))
+ obj2dat(self.properties, pn_connection_properties(self._impl))
+ pn_connection_open(self._impl)
+
+ def close(self):
+ """
+ Closes the connection.
+
+ In more detail, this moves the local state of the connection to
+ the CLOSED state and triggers a close frame to be sent to the
+ peer. A connection is fully closed once both peers have closed it.
+ """
+ self._update_cond()
+ pn_connection_close(self._impl)
+ if hasattr(self, '_session_policy'):
+ # break circular ref
+ del self._session_policy
+
+ @property
+ def state(self):
+ """
+ The state of the connection as a bit field. The state has a local
+ and a remote component. Each of these can be in one of three
+ states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
+ against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
+ REMOTE_ACTIVE and REMOTE_CLOSED.
+ """
+ return pn_connection_state(self._impl)
+
+ def session(self):
+ """
+ Returns a new session on this connection.
+ """
+ ssn = pn_session(self._impl)
+ if ssn is None:
+ raise (SessionException("Session allocation failed."))
+ else:
+ return Session(ssn)
+
+ def session_head(self, mask):
+ return Session.wrap(pn_session_head(self._impl, mask))
+
+ def link_head(self, mask):
+ return Link.wrap(pn_link_head(self._impl, mask))
+
+ @property
+ def work_head(self):
+ return Delivery.wrap(pn_work_head(self._impl))
+
+ @property
+ def error(self):
+ return pn_error_code(pn_connection_error(self._impl))
+
+ def free(self):
+ pn_connection_release(self._impl)
+
+
+class Session(Wrapper, Endpoint):
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Session(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_session_attachments)
+
+ def _get_attachments(self):
+ return pn_session_attachments(self._impl)
+
+ def _get_cond_impl(self):
+ return pn_session_condition(self._impl)
+
+ def _get_remote_cond_impl(self):
+ return pn_session_remote_condition(self._impl)
+
+ def _get_incoming_capacity(self):
+ return pn_session_get_incoming_capacity(self._impl)
+
+ def _set_incoming_capacity(self, capacity):
+ pn_session_set_incoming_capacity(self._impl, capacity)
+
+ incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
+
+ def _get_outgoing_window(self):
+ return pn_session_get_outgoing_window(self._impl)
+
+ def _set_outgoing_window(self, window):
+ pn_session_set_outgoing_window(self._impl, window)
+
+ outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+
+ @property
+ def outgoing_bytes(self):
+ return pn_session_outgoing_bytes(self._impl)
+
+ @property
+ def incoming_bytes(self):
+ return pn_session_incoming_bytes(self._impl)
+
+ def open(self):
+ pn_session_open(self._impl)
+
+ def close(self):
+ self._update_cond()
+ pn_session_close(self._impl)
+
+ def next(self, mask):
+ return Session.wrap(pn_session_next(self._impl, mask))
+
+ @property
+ def state(self):
+ return pn_session_state(self._impl)
+
+ @property
+ def connection(self):
+ return Connection.wrap(pn_session_connection(self._impl))
+
+ def sender(self, name):
+ return Sender(pn_sender(self._impl, unicode2utf8(name)))
+
+ def receiver(self, name):
+ return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
+
+ def free(self):
+ pn_session_free(self._impl)
+
+
+class Link(Wrapper, Endpoint):
+ """
+ A representation of an AMQP link, of which there are two concrete
+ implementations, Sender and Receiver.
+ """
+
+ SND_UNSETTLED = PN_SND_UNSETTLED
+ SND_SETTLED = PN_SND_SETTLED
+ SND_MIXED = PN_SND_MIXED
+
+ RCV_FIRST = PN_RCV_FIRST
+ RCV_SECOND = PN_RCV_SECOND
+
+ @staticmethod
+ def wrap(impl):
+ if impl is None: return None
+ if pn_link_is_sender(impl):
+ return Sender(impl)
+ else:
+ return Receiver(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_link_attachments)
+
+ def _get_attachments(self):
+ return pn_link_attachments(self._impl)
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, LinkException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
+ else:
+ return err
+
+ def _get_cond_impl(self):
+ return pn_link_condition(self._impl)
+
+ def _get_remote_cond_impl(self):
+ return pn_link_remote_condition(self._impl)
+
+ def open(self):
+ """
+ Opens the link.
+
+ In more detail, this moves the local state of the link to the
+ ACTIVE state and triggers an attach frame to be sent to the
+ peer. A link is fully active once both peers have attached it.
+ """
+ pn_link_open(self._impl)
+
+ def close(self):
+ """
+ Closes the link.
+
+ In more detail, this moves the local state of the link to the
+ CLOSED state and triggers an detach frame (with the closed flag
+ set) to be sent to the peer. A link is fully closed once both
+ peers have detached it.
+ """
+ self._update_cond()
+ pn_link_close(self._impl)
+
+ @property
+ def state(self):
+ """
+ The state of the link as a bit field. The state has a local
+ and a remote component. Each of these can be in one of three
+ states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
+ against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
+ REMOTE_ACTIVE and REMOTE_CLOSED.
+ """
+ return pn_link_state(self._impl)
+
+ @property
+ def source(self):
+ """The source of the link as described by the local peer."""
+ return Terminus(pn_link_source(self._impl))
+
+ @property
+ def target(self):
+ """The target of the link as described by the local peer."""
+ return Terminus(pn_link_target(self._impl))
+
+ @property
+ def remote_source(self):
+ """The source of the link as described by the remote peer."""
+ return Terminus(pn_link_remote_source(self._impl))
+
+ @property
+ def remote_target(self):
+ """The target of the link as described by the remote peer."""
+ return Terminus(pn_link_remote_target(self._impl))
+
+ @property
+ def session(self):
+ return Session.wrap(pn_link_session(self._impl))
+
+ @property
+ def connection(self):
+ """The connection on which this link was attached."""
+ return self.session.connection
+
+ def delivery(self, tag):
+ return Delivery(pn_delivery(self._impl, tag))
+
+ @property
+ def current(self):
+ return Delivery.wrap(pn_link_current(self._impl))
+
+ def advance(self):
+ return pn_link_advance(self._impl)
+
+ @property
+ def unsettled(self):
+ return pn_link_unsettled(self._impl)
+
+ @property
+ def credit(self):
+ """The amount of outstanding credit on this link."""
+ return pn_link_credit(self._impl)
+
+ @property
+ def available(self):
+ return pn_link_available(self._impl)
+
+ @property
+ def queued(self):
+ return pn_link_queued(self._impl)
+
+ def next(self, mask):
+ return Link.wrap(pn_link_next(self._impl, mask))
+
+ @property
+ def name(self):
+ """Returns the name of the link"""
+ return utf82unicode(pn_link_name(self._impl))
+
+ @property
+ def is_sender(self):
+ """Returns true if this link is a sender."""
+ return pn_link_is_sender(self._impl)
+
+ @property
+ def is_receiver(self):
+ """Returns true if this link is a receiver."""
+ return pn_link_is_receiver(self._impl)
+
+ @property
+ def remote_snd_settle_mode(self):
+ return pn_link_remote_snd_settle_mode(self._impl)
+
+ @property
+ def remote_rcv_settle_mode(self):
+ return pn_link_remote_rcv_settle_mode(self._impl)
+
+ def _get_snd_settle_mode(self):
+ return pn_link_snd_settle_mode(self._impl)
+
+ def _set_snd_settle_mode(self, mode):
+ pn_link_set_snd_settle_mode(self._impl, mode)
+
+ snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
+
+ def _get_rcv_settle_mode(self):
+ return pn_link_rcv_settle_mode(self._impl)
+
+ def _set_rcv_settle_mode(self, mode):
+ pn_link_set_rcv_settle_mode(self._impl, mode)
+
+ rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
+
+ def _get_drain(self):
+ return pn_link_get_drain(self._impl)
+
+ def _set_drain(self, b):
+ pn_link_set_drain(self._impl, bool(b))
+
+ drain_mode = property(_get_drain, _set_drain)
+
+ def drained(self):
+ return pn_link_drained(self._impl)
+
+ @property
+ def remote_max_message_size(self):
+ return pn_link_remote_max_message_size(self._impl)
+
+ def _get_max_message_size(self):
+ return pn_link_max_message_size(self._impl)
+
+ def _set_max_message_size(self, mode):
+ pn_link_set_max_message_size(self._impl, mode)
+
+ max_message_size = property(_get_max_message_size, _set_max_message_size)
+
+ def detach(self):
+ return pn_link_detach(self._impl)
+
+ def free(self):
+ pn_link_free(self._impl)
+
+
+class Sender(Link):
+ """
+ A link over which messages are sent.
+ """
+
+ def offered(self, n):
+ pn_link_offered(self._impl, n)
+
+ def stream(self, data):
+ """
+ Send specified data as part of the current delivery
+
+ @type data: binary
+ @param data: data to send
+ """
+ return self._check(pn_link_send(self._impl, data))
+
+ def send(self, obj, tag=None):
+ """
+ Send specified object over this sender; the object is expected to
+ have a send() method on it that takes the sender and an optional
+ tag as arguments.
+
+ Where the object is a Message, this will send the message over
+ this link, creating a new delivery for the purpose.
+ """
+ if hasattr(obj, 'send'):
+ return obj.send(self, tag=tag)
+ else:
+ # treat object as bytes
+ return self.stream(obj)
+
+ def delivery_tag(self):
+ if not hasattr(self, 'tag_generator'):
+ def simple_tags():
+ count = 1
+ while True:
+ yield str(count)
+ count += 1
+
+ self.tag_generator = simple_tags()
+ return next(self.tag_generator)
+
+
+class Receiver(Link):
+ """
+ A link over which messages are received.
+ """
+
+ def flow(self, n):
+ """Increases the credit issued to the remote sender by the specified number of messages."""
+ pn_link_flow(self._impl, n)
+
+ def recv(self, limit):
+ n, binary = pn_link_recv(self._impl, limit)
+ if n == PN_EOS:
+ return None
+ else:
+ self._check(n)
+ return binary
+
+ def drain(self, n):
+ pn_link_drain(self._impl, n)
+
+ def draining(self):
+ return pn_link_draining(self._impl)
+
+
+class Terminus(object):
+ UNSPECIFIED = PN_UNSPECIFIED
+ SOURCE = PN_SOURCE
+ TARGET = PN_TARGET
+ COORDINATOR = PN_COORDINATOR
+
+ NONDURABLE = PN_NONDURABLE
+ CONFIGURATION = PN_CONFIGURATION
+ DELIVERIES = PN_DELIVERIES
+
+ DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
+ DIST_MODE_COPY = PN_DIST_MODE_COPY
+ DIST_MODE_MOVE = PN_DIST_MODE_MOVE
+
+ EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
+ EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
+ EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
+ EXPIRE_NEVER = PN_EXPIRE_NEVER
+
+ def __init__(self, impl):
+ self._impl = impl
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, LinkException)
+ raise exc("[%s]" % err)
+ else:
+ return err
+
+ def _get_type(self):
+ return pn_terminus_get_type(self._impl)
+
+ def _set_type(self, type):
+ self._check(pn_terminus_set_type(self._impl, type))
+
+ type = property(_get_type, _set_type)
+
+ def _get_address(self):
+ """The address that identifies the source or target node"""
+ return utf82unicode(pn_terminus_get_address(self._impl))
+
+ def _set_address(self, address):
+ self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
+
+ address = property(_get_address, _set_address)
+
+ def _get_durability(self):
+ return pn_terminus_get_durability(self._impl)
+
+ def _set_durability(self, seconds):
+ self._check(pn_terminus_set_durability(self._impl, seconds))
+
+ durability = property(_get_durability, _set_durability)
+
+ def _get_expiry_policy(self):
+ return pn_terminus_get_expiry_policy(self._impl)
+
+ def _set_expiry_policy(self, seconds):
+ self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
+
+ expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
+
+ def _get_timeout(self):
+ return pn_terminus_get_timeout(self._impl)
+
+ def _set_timeout(self, seconds):
+ self._check(pn_terminus_set_timeout(self._impl, seconds))
+
+ timeout = property(_get_timeout, _set_timeout)
+
+ def _is_dynamic(self):
+ """Indicates whether the source or target node was dynamically
+ created"""
+ return pn_terminus_is_dynamic(self._impl)
+
+ def _set_dynamic(self, dynamic):
+ self._check(pn_terminus_set_dynamic(self._impl, dynamic))
+
+ dynamic = property(_is_dynamic, _set_dynamic)
+
+ def _get_distribution_mode(self):
+ return pn_terminus_get_distribution_mode(self._impl)
+
+ def _set_distribution_mode(self, mode):
+ self._check(pn_terminus_set_distribution_mode(self._impl, mode))
+
+ distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
+ @property
+ def properties(self):
+ """Properties of a dynamic source or target."""
+ return Data(pn_terminus_properties(self._impl))
+
+ @property
+ def capabilities(self):
+ """Capabilities of the source or target."""
+ return Data(pn_terminus_capabilities(self._impl))
+
+ @property
+ def outcomes(self):
+ return Data(pn_terminus_outcomes(self._impl))
+
+ @property
+ def filter(self):
+ """A filter on a source allows the set of messages transfered over
+ the link to be restricted"""
+ return Data(pn_terminus_filter(self._impl))
+
+ def copy(self, src):
+ self._check(pn_terminus_copy(self._impl, src._impl))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_events.py
----------------------------------------------------------------------
diff --git a/python/proton/_events.py b/python/proton/_events.py
new file mode 100644
index 0000000..c8af8e2
--- /dev/null
+++ b/python/proton/_events.py
@@ -0,0 +1,333 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import threading
+
+from cproton import PN_SESSION_REMOTE_CLOSE, PN_SESSION_FINAL, pn_event_context, pn_collector_put, \
+ PN_SELECTABLE_UPDATED, pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_attachments, pn_event_type, \
+ pn_collector_free, pn_handler_dispatch, PN_SELECTABLE_WRITABLE, PN_SELECTABLE_INIT, PN_SESSION_REMOTE_OPEN, \
+ pn_collector_peek, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \
+ PN_TRANSPORT_ERROR, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_LOCAL_CLOSE, pn_event_delivery, \
+ PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, pn_event_reactor, \
+ PN_CONNECTION_REMOTE_CLOSE, pn_collector_pop, PN_LINK_INIT, pn_event_link, PN_CONNECTION_UNBOUND, \
+ pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, PN_REACTOR_INIT, PN_REACTOR_QUIESCED, \
+ PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name, PN_SELECTABLE_READABLE, \
+ pn_event_transport, PN_TRANSPORT_TAIL_CLOSED, PN_SELECTABLE_FINAL, PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \
+ PN_SESSION_LOCAL_CLOSE, pn_event_copy, PN_REACTOR_FINAL, PN_LINK_LOCAL_OPEN, PN_SELECTABLE_EXPIRED, \
+ PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, pn_event_root, PN_SELECTABLE_ERROR, \
+ PN_CONNECTION_INIT, pn_event_class, pn_void2py, pn_cast_pn_session, pn_cast_pn_link, pn_cast_pn_delivery, \
+ pn_cast_pn_transport, pn_cast_pn_connection, pn_cast_pn_selectable
+
+from ._common import Constant
+from ._delivery import Delivery
+from ._endpoints import Connection, Session, Link
+from ._reactor_impl import Selectable, WrappedHandler
+from ._transport import Transport
+from ._wrapper import Wrapper
+
+
+class Collector:
+
+ def __init__(self):
+ self._impl = pn_collector()
+
+ def put(self, obj, etype):
+ pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
+
+ def peek(self):
+ return Event.wrap(pn_collector_peek(self._impl))
+
+ def pop(self):
+ ev = self.peek()
+ pn_collector_pop(self._impl)
+
+ def __del__(self):
+ pn_collector_free(self._impl)
+ del self._impl
+
+
+if "TypeExtender" not in globals():
+ class TypeExtender:
+ def __init__(self, number):
+ self.number = number
+
+ def next(self):
+ try:
+ return self.number
+ finally:
+ self.number += 1
+
+
+class EventType(object):
+ _lock = threading.Lock()
+ _extended = TypeExtender(10000)
+ TYPES = {}
+
+ def __init__(self, name=None, number=None, method=None):
+ if name is None and number is None:
+ raise TypeError("extended events require a name")
+ try:
+ self._lock.acquire()
+ if name is None:
+ name = pn_event_type_name(number)
+
+ if number is None:
+ number = self._extended.next()
+
+ if method is None:
+ method = "on_%s" % name
+
+ self.name = name
+ self.number = number
+ self.method = method
+
+ self.TYPES[number] = self
+ finally:
+ self._lock.release()
+
+ def __repr__(self):
+ return self.name
+
+
+def dispatch(handler, method, *args):
+ m = getattr(handler, method, None)
+ if m:
+ return m(*args)
+ elif hasattr(handler, "on_unhandled"):
+ return handler.on_unhandled(method, *args)
+
+
+class EventBase(object):
+
+ def __init__(self, clazz, context, type):
+ self.clazz = clazz
+ self.context = context
+ self.type = type
+
+ def dispatch(self, handler):
+ return dispatch(handler, self.type.method, self)
+
+
+def _none(x): return None
+
+
+DELEGATED = Constant("DELEGATED")
+
+
+def _core(number, method):
+ return EventType(number=number, method=method)
+
+
+wrappers = {
+ "pn_void": lambda x: pn_void2py(x),
+ "pn_pyref": lambda x: pn_void2py(x),
+ "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
+ "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
+ "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
+ "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
+ "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
+ "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
+}
+
+
+class Event(Wrapper, EventBase):
+ REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
+ REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
+ REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
+
+ TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
+
+ CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
+ CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
+ CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
+ CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
+ CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
+ CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
+ CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
+ CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
+
+ SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
+ SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
+ SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
+ SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
+ SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
+ SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
+
+ LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
+ LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
+ LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
+ LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
+ LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
+ LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
+ LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
+ LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
+ LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
+
+ DELIVERY = _core(PN_DELIVERY, "on_delivery")
+
+ TRANSPORT = _core(PN_TRANSPORT, "on_transport")
+ TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
+ TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
+ TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
+ TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
+
+ SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
+ SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
+ SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
+ SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
+ SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
+ SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
+ SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
+
+ @staticmethod
+ def wrap(impl, number=None):
+ if impl is None:
+ return None
+
+ if number is None:
+ number = pn_event_type(impl)
+
+ event = Event(impl, number)
+
+ # check for an application defined ApplicationEvent and return that. This
+ # avoids an expensive wrap operation invoked by event.context
+ if pn_event_class(impl) == PN_PYREF and \
+ isinstance(event.context, EventBase):
+ return event.context
+ else:
+ return event
+
+ def __init__(self, impl, number):
+ Wrapper.__init__(self, impl, pn_event_attachments)
+ self.__dict__["type"] = EventType.TYPES[number]
+
+ def _init(self):
+ pass
+
+ def copy(self):
+ copy = pn_event_copy(self._impl)
+ return Event.wrap(copy)
+
+ @property
+ def clazz(self):
+ cls = pn_event_class(self._impl)
+ if cls:
+ return pn_class_name(cls)
+ else:
+ return None
+
+ @property
+ def root(self):
+ return WrappedHandler.wrap(pn_event_root(self._impl))
+
+ @property
+ def context(self):
+ """Returns the context object associated with the event. The type of this depend on the type of event."""
+ return wrappers[self.clazz](pn_event_context(self._impl))
+
+ def dispatch(self, handler, type=None):
+ type = type or self.type
+ if isinstance(handler, WrappedHandler):
+ pn_handler_dispatch(handler._impl, self._impl, type.number)
+ else:
+ result = dispatch(handler, type.method, self)
+ if result != DELEGATED and hasattr(handler, "handlers"):
+ for h in handler.handlers:
+ self.dispatch(h, type)
+
+ @property
+ def reactor(self):
+ """Returns the reactor associated with the event."""
+ return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
+
+ def __getattr__(self, name):
+ r = self.reactor
+ if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
+ return r
+ else:
+ return super(Event, self).__getattr__(name)
+
+ @property
+ def transport(self):
+ """Returns the transport associated with the event, or null if none is associated with it."""
+ return Transport.wrap(pn_event_transport(self._impl))
+
+ @property
+ def connection(self):
+ """Returns the connection associated with the event, or null if none is associated with it."""
+ return Connection.wrap(pn_event_connection(self._impl))
+
+ @property
+ def session(self):
+ """Returns the session associated with the event, or null if none is associated with it."""
+ return Session.wrap(pn_event_session(self._impl))
+
+ @property
+ def link(self):
+ """Returns the link associated with the event, or null if none is associated with it."""
+ return Link.wrap(pn_event_link(self._impl))
+
+ @property
+ def sender(self):
+ """Returns the sender link associated with the event, or null if
+ none is associated with it. This is essentially an alias for
+ link(), that does an additional checkon the type of the
+ link."""
+ l = self.link
+ if l and l.is_sender:
+ return l
+ else:
+ return None
+
+ @property
+ def receiver(self):
+ """Returns the receiver link associated with the event, or null if
+ none is associated with it. This is essentially an alias for
+ link(), that does an additional checkon the type of the link."""
+ l = self.link
+ if l and l.is_receiver:
+ return l
+ else:
+ return None
+
+ @property
+ def delivery(self):
+ """Returns the delivery associated with the event, or null if none is associated with it."""
+ return Delivery.wrap(pn_event_delivery(self._impl))
+
+ def __repr__(self):
+ return "%s(%s)" % (self.type, self.context)
+
+
+class LazyHandlers(object):
+ def __get__(self, obj, clazz):
+ if obj is None:
+ return self
+ ret = []
+ obj.__dict__['handlers'] = ret
+ return ret
+
+
+class Handler(object):
+ handlers = LazyHandlers()
+
+ def on_unhandled(self, method, *args):
+ pass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org