You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/11/26 21:06:07 UTC
[20/35] qpid-proton git commit: changed proton.py from a module into
a package
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6a78d2f7/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
new file mode 100644
index 0000000..fce3255
--- /dev/null
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -0,0 +1,3891 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The proton module defines a suite of APIs that implement the AMQP 1.0
+protocol.
+
+The proton APIs consist of the following classes:
+
+ - L{Messenger} -- A messaging endpoint.
+ - L{Message} -- A class for creating and/or accessing AMQP message content.
+ - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
+ data.
+
+"""
+
+from cproton import *
+
+import weakref, re, socket
+try:
+ import uuid
+except ImportError:
+ """
+ No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
+ """
+ import struct
+ class uuid:
+ class UUID:
+ def __init__(self, hex=None, bytes=None):
+ if [hex, bytes].count(None) != 1:
+ raise TypeError("need one of hex or bytes")
+ if bytes is not None:
+ self.bytes = bytes
+ elif hex is not None:
+ fields=hex.split("-")
+ fields[4:5] = [fields[4][:4], fields[4][4:]]
+ self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
+
+ def __cmp__(self, other):
+ if isinstance(other, uuid.UUID):
+ return cmp(self.bytes, other.bytes)
+ else:
+ return -1
+
+ def __str__(self):
+ return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
+
+ def __repr__(self):
+ return "UUID(%r)" % str(self)
+
+ def __hash__(self):
+ return self.bytes.__hash__()
+
+ import os, random, socket, time
+ rand = random.Random()
+ rand.seed((os.getpid(), time.time(), socket.gethostname()))
+ def random_uuid():
+ bytes = [rand.randint(0, 255) for i in xrange(16)]
+
+ # From RFC4122, the version bits are set to 0100
+ bytes[7] &= 0x0F
+ bytes[7] |= 0x40
+
+ # From RFC4122, the top two bits of byte 8 get set to 01
+ bytes[8] &= 0x3F
+ bytes[8] |= 0x80
+ return "".join(map(chr, bytes))
+
+ def uuid4():
+ return uuid.UUID(bytes=random_uuid())
+
+try:
+ bytes()
+except NameError:
+ bytes = str
+
+VERSION_MAJOR = PN_VERSION_MAJOR
+VERSION_MINOR = PN_VERSION_MINOR
+API_LANGUAGE = "C"
+IMPLEMENTATION_LANGUAGE = "C"
+
+class Constant(object):
+
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.name
+
+class ProtonException(Exception):
+ """
+ The root of the proton exception hierarchy. All proton exception
+ classes derive from this exception.
+ """
+ pass
+
+class Timeout(ProtonException):
+ """
+ A timeout exception indicates that a blocking operation has timed
+ out.
+ """
+ pass
+
+class Interrupt(ProtonException):
+ """
+ An interrupt exception indicaes that a blocking operation was interrupted.
+ """
+ pass
+
+class MessengerException(ProtonException):
+ """
+ The root of the messenger exception hierarchy. All exceptions
+ generated by the messenger class derive from this exception.
+ """
+ pass
+
+class MessageException(ProtonException):
+ """
+ The MessageException class is the root of the message exception
+ hierarhcy. All exceptions generated by the Message class derive from
+ this exception.
+ """
+ pass
+
+EXCEPTIONS = {
+ PN_TIMEOUT: Timeout,
+ PN_INTR: Interrupt
+ }
+
+PENDING = Constant("PENDING")
+ACCEPTED = Constant("ACCEPTED")
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
+ABORTED = Constant("ABORTED")
+SETTLED = Constant("SETTLED")
+
+STATUSES = {
+ PN_STATUS_ABORTED: ABORTED,
+ PN_STATUS_ACCEPTED: ACCEPTED,
+ PN_STATUS_REJECTED: REJECTED,
+ PN_STATUS_RELEASED: RELEASED,
+ PN_STATUS_PENDING: PENDING,
+ PN_STATUS_SETTLED: SETTLED,
+ PN_STATUS_UNKNOWN: None
+ }
+
+AUTOMATIC = Constant("AUTOMATIC")
+MANUAL = Constant("MANUAL")
+
+class Messenger(object):
+ """
+ The L{Messenger} class defines a high level interface for sending
+ and receiving L{Messages<Message>}. Every L{Messenger} contains a
+ single logical queue of incoming messages and a single logical queue
+ of outgoing messages. These messages in these queues may be destined
+ for, or originate from, a variety of addresses.
+
+ The messenger interface is single-threaded. All methods
+ except one (L{interrupt}) are intended to be used from within
+ the messenger thread.
+
+
+ Address Syntax
+ ==============
+
+ An address has the following form::
+
+ [ amqp[s]:// ] [user[:password]@] domain [/[name]]
+
+ Where domain can be one of::
+
+ host | host:port | ip | ip:port | name
+
+ The following are valid examples of addresses:
+
+ - example.org
+ - example.org:1234
+ - amqp://example.org
+ - amqps://example.org
+ - example.org/incoming
+ - amqps://example.org/outgoing
+ - amqps://fred:trustno1@example.org
+ - 127.0.0.1:1234
+ - amqps://127.0.0.1:1234
+
+ Sending & Receiving Messages
+ ============================
+
+ The L{Messenger} class works in conjuction with the L{Message} class. The
+ L{Message} class is a mutable holder of message content.
+
+ The L{put} method copies its L{Message} to the outgoing queue, and may
+ send queued messages if it can do so without blocking. The L{send}
+ method blocks until it has sent the requested number of messages,
+ or until a timeout interrupts the attempt.
+
+
+ >>> message = Message()
+ >>> for i in range(3):
+ ... message.address = "amqp://host/queue"
+ ... message.subject = "Hello World %i" % i
+ ... messenger.put(message)
+ >>> messenger.send()
+
+ Similarly, the L{recv} method receives messages into the incoming
+ queue, and may block as it attempts to receive the requested number
+ of messages, or until timeout is reached. It may receive fewer
+ than the requested number. The L{get} method pops the
+ eldest L{Message} off the incoming queue and copies it into the L{Message}
+ object that you supply. It will not block.
+
+
+ >>> message = Message()
+ >>> messenger.recv(10):
+ >>> while messenger.incoming > 0:
+ ... messenger.get(message)
+ ... print message.subject
+ Hello World 0
+ Hello World 1
+ Hello World 2
+
+ The blocking flag allows you to turn off blocking behavior entirely,
+ in which case L{send} and L{recv} will do whatever they can without
+ blocking, and then return. You can then look at the number
+ of incoming and outgoing messages to see how much outstanding work
+ still remains.
+ """
+
+ def __init__(self, name=None):
+ """
+ Construct a new L{Messenger} with the given name. The name has
+ global scope. If a NULL name is supplied, a UUID based name will
+ be chosen.
+
+ @type name: string
+ @param name: the name of the messenger or None
+
+ """
+ self._mng = pn_messenger(name)
+ self._selectables = {}
+
+ def __del__(self):
+ """
+ Destroy the L{Messenger}. This will close all connections that
+ are managed by the L{Messenger}. Call the L{stop} method before
+ destroying the L{Messenger}.
+ """
+ if hasattr(self, "_mng"):
+ pn_messenger_free(self._mng)
+ del self._mng
+
+ def _check(self, err):
+ if err < 0:
+ if (err == PN_INPROGRESS):
+ return
+ exc = EXCEPTIONS.get(err, MessengerException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
+ else:
+ return err
+
+ @property
+ def name(self):
+ """
+ The name of the L{Messenger}.
+ """
+ return pn_messenger_name(self._mng)
+
+ def _get_certificate(self):
+ return pn_messenger_get_certificate(self._mng)
+
+ def _set_certificate(self, value):
+ self._check(pn_messenger_set_certificate(self._mng, value))
+
+ certificate = property(_get_certificate, _set_certificate,
+ doc="""
+Path to a certificate file for the L{Messenger}. This certificate is
+used when the L{Messenger} accepts or establishes SSL/TLS connections.
+This property must be specified for the L{Messenger} to accept
+incoming SSL/TLS connections and to establish client authenticated
+outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
+connections do not require this property.
+""")
+
+ def _get_private_key(self):
+ return pn_messenger_get_private_key(self._mng)
+
+ def _set_private_key(self, value):
+ self._check(pn_messenger_set_private_key(self._mng, value))
+
+ private_key = property(_get_private_key, _set_private_key,
+ doc="""
+Path to a private key file for the L{Messenger's<Messenger>}
+certificate. This property must be specified for the L{Messenger} to
+accept incoming SSL/TLS connections and to establish client
+authenticated outgoing SSL/TLS connection. Non client authenticated
+SSL/TLS connections do not require this property.
+""")
+
+ def _get_password(self):
+ return pn_messenger_get_password(self._mng)
+
+ def _set_password(self, value):
+ self._check(pn_messenger_set_password(self._mng, value))
+
+ password = property(_get_password, _set_password,
+ doc="""
+This property contains the password for the L{Messenger.private_key}
+file, or None if the file is not encrypted.
+""")
+
+ def _get_trusted_certificates(self):
+ return pn_messenger_get_trusted_certificates(self._mng)
+
+ def _set_trusted_certificates(self, value):
+ self._check(pn_messenger_set_trusted_certificates(self._mng, value))
+
+ trusted_certificates = property(_get_trusted_certificates,
+ _set_trusted_certificates,
+ doc="""
+A path to a database of trusted certificates for use in verifying the
+peer on an SSL/TLS connection. If this property is None, then the peer
+will not be verified.
+""")
+
+ def _get_timeout(self):
+ t = pn_messenger_get_timeout(self._mng)
+ if t == -1:
+ return None
+ else:
+ return millis2secs(t)
+
+ def _set_timeout(self, value):
+ if value is None:
+ t = -1
+ else:
+ t = secs2millis(value)
+ self._check(pn_messenger_set_timeout(self._mng, t))
+
+ timeout = property(_get_timeout, _set_timeout,
+ doc="""
+The timeout property contains the default timeout for blocking
+operations performed by the L{Messenger}.
+""")
+
+ def _is_blocking(self):
+ return pn_messenger_is_blocking(self._mng)
+
+ def _set_blocking(self, b):
+ self._check(pn_messenger_set_blocking(self._mng, b))
+
+ blocking = property(_is_blocking, _set_blocking,
+ doc="""
+Enable or disable blocking behavior during L{Message} sending
+and receiving. This affects every blocking call, with the
+exception of L{work}. Currently, the affected calls are
+L{send}, L{recv}, and L{stop}.
+""")
+
+ def _is_passive(self):
+ return pn_messenger_is_passive(self._mng)
+
+ def _set_passive(self, b):
+ self._check(pn_messenger_set_passive(self._mng, b))
+
+ passive = property(_is_passive, _set_passive,
+ doc="""
+When passive is set to true, Messenger will not attempt to perform I/O
+internally. In this mode it is necessary to use the selectables API to
+drive any I/O needed to perform requested actions. In this mode
+Messenger will never block.
+""")
+
+ def _get_incoming_window(self):
+ return pn_messenger_get_incoming_window(self._mng)
+
+ def _set_incoming_window(self, window):
+ self._check(pn_messenger_set_incoming_window(self._mng, window))
+
+ incoming_window = property(_get_incoming_window, _set_incoming_window,
+ doc="""
+The incoming tracking window for the messenger. The messenger will
+track the remote status of this many incoming deliveries after they
+have been accepted or rejected. Defaults to zero.
+
+L{Messages<Message>} enter this window only when you take them into your application
+using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
+without explicitly accepting or rejecting the oldest message, then the
+message that passes beyond the edge of the incoming window will be assigned
+the default disposition of its link.
+""")
+
+ def _get_outgoing_window(self):
+ return pn_messenger_get_outgoing_window(self._mng)
+
+ def _set_outgoing_window(self, window):
+ self._check(pn_messenger_set_outgoing_window(self._mng, window))
+
+ outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
+ doc="""
+The outgoing tracking window for the messenger. The messenger will
+track the remote status of this many outgoing deliveries after calling
+send. Defaults to zero.
+
+A L{Message} enters this window when you call the put() method with the
+message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
+times, status information will no longer be available for the
+first message.
+""")
+
+ def start(self):
+ """
+ Currently a no-op placeholder.
+ For future compatibility, do not L{send} or L{recv} messages
+ before starting the L{Messenger}.
+ """
+ self._check(pn_messenger_start(self._mng))
+
+ def stop(self):
+ """
+ Transitions the L{Messenger} to an inactive state. An inactive
+ L{Messenger} will not send or receive messages from its internal
+ queues. A L{Messenger} should be stopped before being discarded to
+ ensure a clean shutdown handshake occurs on any internally managed
+ connections.
+ """
+ self._check(pn_messenger_stop(self._mng))
+
+ @property
+ def stopped(self):
+ """
+ Returns true iff a L{Messenger} is in the stopped state.
+ This function does not block.
+ """
+ return pn_messenger_stopped(self._mng)
+
+ def subscribe(self, source):
+ """
+ Subscribes the L{Messenger} to messages originating from the
+ specified source. The source is an address as specified in the
+ L{Messenger} introduction with the following addition. If the
+ domain portion of the address begins with the '~' character, the
+ L{Messenger} will interpret the domain as host/port, bind to it,
+ and listen for incoming messages. For example "~0.0.0.0",
+ "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
+ local interface and listen for incoming messages with the last
+ variant only permitting incoming SSL connections.
+
+ @type source: string
+ @param source: the source of messages to subscribe to
+ """
+ sub_impl = pn_messenger_subscribe(self._mng, source)
+ if not sub_impl:
+ self._check(pn_error_code(pn_messenger_error(self._mng)))
+ raise MessengerException("Cannot subscribe to %s"%source)
+ return Subscription(sub_impl)
+
+ def put(self, message):
+ """
+ Places the content contained in the message onto the outgoing
+ queue of the L{Messenger}. This method will never block, however
+ it will send any unblocked L{Messages<Message>} in the outgoing
+ queue immediately and leave any blocked L{Messages<Message>}
+ remaining in the outgoing queue. The L{send} call may be used to
+ block until the outgoing queue is empty. The L{outgoing} property
+ may be used to check the depth of the outgoing queue.
+
+ When the content in a given L{Message} object is copied to the outgoing
+ message queue, you may then modify or discard the L{Message} object
+ without having any impact on the content in the outgoing queue.
+
+ This method returns an outgoing tracker for the L{Message}. The tracker
+ can be used to determine the delivery status of the L{Message}.
+
+ @type message: Message
+ @param message: the message to place in the outgoing queue
+ @return: a tracker
+ """
+ message._pre_encode()
+ self._check(pn_messenger_put(self._mng, message._msg))
+ return pn_messenger_outgoing_tracker(self._mng)
+
+ def status(self, tracker):
+ """
+ Gets the last known remote state of the delivery associated with
+ the given tracker.
+
+ @type tracker: tracker
+ @param tracker: the tracker whose status is to be retrieved
+
+ @return: one of None, PENDING, REJECTED, or ACCEPTED
+ """
+ disp = pn_messenger_status(self._mng, tracker);
+ return STATUSES.get(disp, disp)
+
+ def buffered(self, tracker):
+ """
+ Checks if the delivery associated with the given tracker is still
+ waiting to be sent.
+
+ @type tracker: tracker
+ @param tracker: the tracker whose status is to be retrieved
+
+ @return: true if delivery is still buffered
+ """
+ return pn_messenger_buffered(self._mng, tracker);
+
+ def settle(self, tracker=None):
+ """
+ Frees a L{Messenger} from tracking the status associated with a given
+ tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
+ to the most recent will be settled.
+ """
+ if tracker is None:
+ tracker = pn_messenger_outgoing_tracker(self._mng)
+ flags = PN_CUMULATIVE
+ else:
+ flags = 0
+ self._check(pn_messenger_settle(self._mng, tracker, flags))
+
+ def send(self, n=-1):
+ """
+ This call will block until the indicated number of L{messages<Message>}
+ have been sent, or until the operation times out. If n is -1 this call will
+ block until all outgoing L{messages<Message>} have been sent. If n is 0 then
+ this call will send whatever it can without blocking.
+ """
+ self._check(pn_messenger_send(self._mng, n))
+
+ def recv(self, n=None):
+ """
+ Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
+ for I{n} is supplied, this call will receive as many L{messages<Message>} as it
+ can buffer internally. If the L{Messenger} is in blocking mode, this
+ call will block until at least one L{Message} is available in the
+ incoming queue.
+ """
+ if n is None:
+ n = -1
+ self._check(pn_messenger_recv(self._mng, n))
+
+ def work(self, timeout=None):
+ """
+ Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
+ This will block for the indicated timeout.
+ This method may also do I/O work other than sending and receiving
+ L{messages<Message>}. For example, closing connections after messenger.L{stop}()
+ has been called.
+ """
+ if timeout is None:
+ t = -1
+ else:
+ t = secs2millis(timeout)
+ err = pn_messenger_work(self._mng, t)
+ if (err == PN_TIMEOUT):
+ return False
+ else:
+ self._check(err)
+ return True
+
+ @property
+ def receiving(self):
+ return pn_messenger_receiving(self._mng)
+
+ def interrupt(self):
+ """
+ The L{Messenger} interface is single-threaded.
+ This is the only L{Messenger} function intended to be called
+ from outside of the L{Messenger} thread.
+ Call this from a non-messenger thread to interrupt
+ a L{Messenger} that is blocking.
+ This will cause any in-progress blocking call to throw
+ the L{Interrupt} exception. If there is no currently blocking
+ call, then the next blocking call will be affected, even if it
+ is within the same thread that interrupt was called from.
+ """
+ self._check(pn_messenger_interrupt(self._mng))
+
+ def get(self, message=None):
+ """
+ Moves the message from the head of the incoming message queue into
+ the supplied message object. Any content in the message will be
+ overwritten.
+
+ A tracker for the incoming L{Message} is returned. The tracker can
+ later be used to communicate your acceptance or rejection of the
+ L{Message}.
+
+ If None is passed in for the L{Message} object, the L{Message}
+ popped from the head of the queue is discarded.
+
+ @type message: Message
+ @param message: the destination message object
+ @return: a tracker
+ """
+ if message is None:
+ impl = None
+ else:
+ impl = message._msg
+ self._check(pn_messenger_get(self._mng, impl))
+ if message is not None:
+ message._post_decode()
+ return pn_messenger_incoming_tracker(self._mng)
+
+ def accept(self, tracker=None):
+ """
+ Signal the sender that you have acted on the L{Message}
+ pointed to by the tracker. If no tracker is supplied,
+ then all messages that have been returned by the L{get}
+ method are accepted, except those that have already been
+ auto-settled by passing beyond your incoming window size.
+
+ @type tracker: tracker
+ @param tracker: a tracker as returned by get
+ """
+ if tracker is None:
+ tracker = pn_messenger_incoming_tracker(self._mng)
+ flags = PN_CUMULATIVE
+ else:
+ flags = 0
+ self._check(pn_messenger_accept(self._mng, tracker, flags))
+
+ def reject(self, tracker=None):
+ """
+ Rejects the L{Message} indicated by the tracker. If no tracker
+ is supplied, all messages that have been returned by the L{get}
+ method are rejected, except those that have already been auto-settled
+ by passing beyond your outgoing window size.
+
+ @type tracker: tracker
+ @param tracker: a tracker as returned by get
+ """
+ if tracker is None:
+ tracker = pn_messenger_incoming_tracker(self._mng)
+ flags = PN_CUMULATIVE
+ else:
+ flags = 0
+ self._check(pn_messenger_reject(self._mng, tracker, flags))
+
+ @property
+ def outgoing(self):
+ """
+ The outgoing queue depth.
+ """
+ return pn_messenger_outgoing(self._mng)
+
+ @property
+ def incoming(self):
+ """
+ The incoming queue depth.
+ """
+ return pn_messenger_incoming(self._mng)
+
+ def route(self, pattern, address):
+ """
+ Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
+
+ The route procedure may be used to influence how a L{Messenger} will
+ internally treat a given address or class of addresses. Every call
+ to the route procedure will result in L{Messenger} appending a routing
+ rule to its internal routing table.
+
+ Whenever a L{Message} is presented to a L{Messenger} for delivery, it
+ will match the address of this message against the set of routing
+ rules in order. The first rule to match will be triggered, and
+ instead of routing based on the address presented in the message,
+ the L{Messenger} will route based on the address supplied in the rule.
+
+ The pattern matching syntax supports two types of matches, a '%'
+ will match any character except a '/', and a '*' will match any
+ character including a '/'.
+
+ A routing address is specified as a normal AMQP address, however it
+ may additionally use substitution variables from the pattern match
+ that triggered the rule.
+
+ Any message sent to "foo" will be routed to "amqp://foo.com":
+
+ >>> messenger.route("foo", "amqp://foo.com");
+
+ Any message sent to "foobar" will be routed to
+ "amqp://foo.com/bar":
+
+ >>> messenger.route("foobar", "amqp://foo.com/bar");
+
+ Any message sent to bar/<path> will be routed to the corresponding
+ path within the amqp://bar.com domain:
+
+ >>> messenger.route("bar/*", "amqp://bar.com/$1");
+
+ Route all L{messages<Message>} over TLS:
+
+ >>> messenger.route("amqp:*", "amqps:$1")
+
+ Supply credentials for foo.com:
+
+ >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
+
+ Supply credentials for all domains:
+
+ >>> messenger.route("amqp://*", "amqp://user:password@$1");
+
+ Route all addresses through a single proxy while preserving the
+ original destination:
+
+ >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
+
+ Route any address through a single broker:
+
+ >>> messenger.route("*", "amqp://user:password@broker/$1");
+ """
+ self._check(pn_messenger_route(self._mng, pattern, address))
+
+ def rewrite(self, pattern, address):
+ """
+ Similar to route(), except that the destination of
+ the L{Message} is determined before the message address is rewritten.
+
+ The outgoing address is only rewritten after routing has been
+ finalized. If a message has an outgoing address of
+ "amqp://0.0.0.0:5678", and a rewriting rule that changes its
+ outgoing address to "foo", it will still arrive at the peer that
+ is listening on "amqp://0.0.0.0:5678", but when it arrives there,
+ the receiver will see its outgoing address as "foo".
+
+ The default rewrite rule removes username and password from addresses
+ before they are transmitted.
+ """
+ self._check(pn_messenger_rewrite(self._mng, pattern, address))
+
+ def selectable(self):
+ impl = pn_messenger_selectable(self._mng)
+ if impl:
+ fd = pn_selectable_fd(impl)
+ sel = self._selectables.get(fd, None)
+ if sel is None:
+ sel = Selectable(self, impl)
+ self._selectables[fd] = sel
+ return sel
+ else:
+ return None
+
+ @property
+ def deadline(self):
+ tstamp = pn_messenger_deadline(self._mng)
+ if tstamp:
+ return millis2secs(tstamp)
+ else:
+ return None
+
+class Message(object):
+ """The L{Message} class is a mutable holder of message content.
+
+ @ivar instructions: delivery instructions for the message
+ @type instructions: dict
+ @ivar annotations: infrastructure defined message annotations
+ @type annotations: dict
+ @ivar properties: application defined message properties
+ @type properties: dict
+ @ivar body: message body
+ @type body: bytes | unicode | dict | list | int | long | float | UUID
+ """
+
+ DATA = PN_DATA
+ TEXT = PN_TEXT
+ AMQP = PN_AMQP
+ JSON = PN_JSON
+
+ DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
+
+ def __init__(self, **kwargs):
+ """
+ @param kwargs: Message property name/value pairs to initialise the Message
+ """
+ self._msg = pn_message()
+ self._id = Data(pn_message_id(self._msg))
+ self._correlation_id = Data(pn_message_correlation_id(self._msg))
+ self.instructions = None
+ self.annotations = None
+ self.properties = None
+ self.body = None
+ for k,v in kwargs.iteritems():
+ getattr(self, k) # Raise exception if it's not a valid attribute.
+ setattr(self, k, v)
+
+ def __del__(self):
+ if hasattr(self, "_msg"):
+ pn_message_free(self._msg)
+ del self._msg
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, MessageException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
+ else:
+ return err
+
+ def _pre_encode(self):
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+ body = Data(pn_message_body(self._msg))
+
+ inst.clear()
+ if self.instructions is not None:
+ inst.put_object(self.instructions)
+ ann.clear()
+ if self.annotations is not None:
+ ann.put_object(self.annotations)
+ props.clear()
+ if self.properties is not None:
+ props.put_object(self.properties)
+ body.clear()
+ if self.body is not None:
+ body.put_object(self.body)
+
+ def _post_decode(self):
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+ body = Data(pn_message_body(self._msg))
+
+ if inst.next():
+ self.instructions = inst.get_object()
+ else:
+ self.instructions = None
+ if ann.next():
+ self.annotations = ann.get_object()
+ else:
+ self.annotations = None
+ if props.next():
+ self.properties = props.get_object()
+ else:
+ self.properties = None
+ if body.next():
+ self.body = body.get_object()
+ else:
+ self.body = None
+
+ def clear(self):
+ """
+ Clears the contents of the L{Message}. All fields will be reset to
+ their default values.
+ """
+ pn_message_clear(self._msg)
+ self.instructions = None
+ self.annotations = None
+ self.properties = None
+ self.body = None
+
+ def _is_inferred(self):
+ return pn_message_is_inferred(self._msg)
+
+ def _set_inferred(self, value):
+ self._check(pn_message_set_inferred(self._msg, bool(value)))
+
+ inferred = property(_is_inferred, _set_inferred, doc="""
+The inferred flag for a message indicates how the message content
+is encoded into AMQP sections. If inferred is true then binary and
+list values in the body of the message will be encoded as AMQP DATA
+and AMQP SEQUENCE sections, respectively. If inferred is false,
+then all values in the body of the message will be encoded as AMQP
+VALUE sections regardless of their type.
+""")
+
+ def _is_durable(self):
+ return pn_message_is_durable(self._msg)
+
+ def _set_durable(self, value):
+ self._check(pn_message_set_durable(self._msg, bool(value)))
+
+ durable = property(_is_durable, _set_durable,
+ doc="""
+The durable property indicates that the message should be held durably
+by any intermediaries taking responsibility for the message.
+""")
+
+ def _get_priority(self):
+ return pn_message_get_priority(self._msg)
+
+ def _set_priority(self, value):
+ self._check(pn_message_set_priority(self._msg, value))
+
+ priority = property(_get_priority, _set_priority,
+ doc="""
+The priority of the message.
+""")
+
+ def _get_ttl(self):
+ return millis2secs(pn_message_get_ttl(self._msg))
+
+ def _set_ttl(self, value):
+ self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
+
+ ttl = property(_get_ttl, _set_ttl,
+ doc="""
+The time to live of the message measured in seconds. Expired messages
+may be dropped.
+""")
+
+ def _is_first_acquirer(self):
+ return pn_message_is_first_acquirer(self._msg)
+
+ def _set_first_acquirer(self, value):
+ self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
+
+ first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
+ doc="""
+True iff the recipient is the first to acquire the message.
+""")
+
+ def _get_delivery_count(self):
+ return pn_message_get_delivery_count(self._msg)
+
+ def _set_delivery_count(self, value):
+ self._check(pn_message_set_delivery_count(self._msg, value))
+
+ delivery_count = property(_get_delivery_count, _set_delivery_count,
+ doc="""
+The number of delivery attempts made for this message.
+""")
+
+
+ def _get_id(self):
+ return self._id.get_object()
+ def _set_id(self, value):
+ if type(value) in (int, long):
+ value = ulong(value)
+ self._id.rewind()
+ self._id.put_object(value)
+ id = property(_get_id, _set_id,
+ doc="""
+The id of the message.
+""")
+
+ def _get_user_id(self):
+ return pn_message_get_user_id(self._msg)
+
+ def _set_user_id(self, value):
+ self._check(pn_message_set_user_id(self._msg, value))
+
+ user_id = property(_get_user_id, _set_user_id,
+ doc="""
+The user id of the message creator.
+""")
+
+ def _get_address(self):
+ return pn_message_get_address(self._msg)
+
+ def _set_address(self, value):
+ self._check(pn_message_set_address(self._msg, value))
+
+ address = property(_get_address, _set_address,
+ doc="""
+The address of the message.
+""")
+
+ def _get_subject(self):
+ return pn_message_get_subject(self._msg)
+
+ def _set_subject(self, value):
+ self._check(pn_message_set_subject(self._msg, value))
+
+ subject = property(_get_subject, _set_subject,
+ doc="""
+The subject of the message.
+""")
+
+ def _get_reply_to(self):
+ return pn_message_get_reply_to(self._msg)
+
+ def _set_reply_to(self, value):
+ self._check(pn_message_set_reply_to(self._msg, value))
+
+ reply_to = property(_get_reply_to, _set_reply_to,
+ doc="""
+The reply-to address for the message.
+""")
+
+ def _get_correlation_id(self):
+ return self._correlation_id.get_object()
+ def _set_correlation_id(self, value):
+ if type(value) in (int, long):
+ value = ulong(value)
+ self._correlation_id.rewind()
+ self._correlation_id.put_object(value)
+
+ correlation_id = property(_get_correlation_id, _set_correlation_id,
+ doc="""
+The correlation-id for the message.
+""")
+
+ def _get_content_type(self):
+ return pn_message_get_content_type(self._msg)
+
+ def _set_content_type(self, value):
+ self._check(pn_message_set_content_type(self._msg, value))
+
+ content_type = property(_get_content_type, _set_content_type,
+ doc="""
+The content-type of the message.
+""")
+
+ def _get_content_encoding(self):
+ return pn_message_get_content_encoding(self._msg)
+
+ def _set_content_encoding(self, value):
+ self._check(pn_message_set_content_encoding(self._msg, value))
+
+ content_encoding = property(_get_content_encoding, _set_content_encoding,
+ doc="""
+The content-encoding of the message.
+""")
+
+ def _get_expiry_time(self):
+ return millis2secs(pn_message_get_expiry_time(self._msg))
+
+ def _set_expiry_time(self, value):
+ self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
+
+ expiry_time = property(_get_expiry_time, _set_expiry_time,
+ doc="""
+The expiry time of the message.
+""")
+
+ def _get_creation_time(self):
+ return millis2secs(pn_message_get_creation_time(self._msg))
+
+ def _set_creation_time(self, value):
+ self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
+
+ creation_time = property(_get_creation_time, _set_creation_time,
+ doc="""
+The creation time of the message.
+""")
+
+ def _get_group_id(self):
+ return pn_message_get_group_id(self._msg)
+
+ def _set_group_id(self, value):
+ self._check(pn_message_set_group_id(self._msg, value))
+
+ group_id = property(_get_group_id, _set_group_id,
+ doc="""
+The group id of the message.
+""")
+
+ def _get_group_sequence(self):
+ return pn_message_get_group_sequence(self._msg)
+
+ def _set_group_sequence(self, value):
+ self._check(pn_message_set_group_sequence(self._msg, value))
+
+ group_sequence = property(_get_group_sequence, _set_group_sequence,
+ doc="""
+The sequence of the message within its group.
+""")
+
+ def _get_reply_to_group_id(self):
+ return pn_message_get_reply_to_group_id(self._msg)
+
+ def _set_reply_to_group_id(self, value):
+ self._check(pn_message_set_reply_to_group_id(self._msg, value))
+
+ reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
+ doc="""
+The group-id for any replies.
+""")
+
+ # XXX
+ def _get_format(self):
+ return pn_message_get_format(self._msg)
+
+ def _set_format(self, value):
+ self._check(pn_message_set_format(self._msg, value))
+
+ format = property(_get_format, _set_format,
+ doc="""
+The format of the message.
+""")
+
+ def encode(self):
+ self._pre_encode()
+ sz = 16
+ while True:
+ err, data = pn_message_encode(self._msg, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return data
+
+ def decode(self, data):
+ self._check(pn_message_decode(self._msg, data, len(data)))
+ self._post_decode()
+
+ def load(self, data):
+ self._check(pn_message_load(self._msg, data))
+
+ def save(self):
+ sz = 16
+ while True:
+ err, data = pn_message_save(self._msg, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return data
+
+ def __repr2__(self):
+ props = []
+ for attr in ("inferred", "address", "reply_to", "durable", "ttl",
+ "priority", "first_acquirer", "delivery_count", "id",
+ "correlation_id", "user_id", "group_id", "group_sequence",
+ "reply_to_group_id", "instructions", "annotations",
+ "properties", "body"):
+ value = getattr(self, attr)
+ if value: props.append("%s=%r" % (attr, value))
+ return "Message(%s)" % ", ".join(props)
+
+ def __repr__(self):
+ tmp = pn_string(None)
+ err = pn_inspect(self._msg, tmp)
+ result = pn_string_get(tmp)
+ pn_free(tmp)
+ self._check(err)
+ return result
+
+class Subscription(object):
+
+ def __init__(self, impl):
+ self._impl = impl
+
+ @property
+ def address(self):
+ return pn_subscription_address(self._impl)
+
+class Selectable(object):
+
+ def __init__(self, messenger, impl):
+ self.messenger = messenger
+ self._impl = impl
+
+ def fileno(self):
+ if not self._impl: raise ValueError("selectable freed")
+ return pn_selectable_fd(self._impl)
+
+ @property
+ def capacity(self):
+ if not self._impl: raise ValueError("selectable freed")
+ return pn_selectable_capacity(self._impl)
+
+ @property
+ def pending(self):
+ if not self._impl: raise ValueError("selectable freed")
+ return pn_selectable_pending(self._impl)
+
+ @property
+ def deadline(self):
+ if not self._impl: raise ValueError("selectable freed")
+ tstamp = pn_selectable_deadline(self._impl)
+ if tstamp:
+ return millis2secs(tstamp)
+ else:
+ return None
+
+ def readable(self):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_readable(self._impl)
+
+ def writable(self):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_writable(self._impl)
+
+ def expired(self):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_expired(self._impl)
+
+ def _is_registered(self):
+ if not self._impl: raise ValueError("selectable freed")
+ return pn_selectable_is_registered(self._impl)
+
+ def _set_registered(self, registered):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_set_registered(self._impl, registered)
+
+ registered = property(_is_registered, _set_registered,
+ doc="""
+The registered property may be get/set by an I/O polling system to
+indicate whether the fd has been registered or not.
+""")
+
+ @property
+ def is_terminal(self):
+ if not self._impl: return True
+ return pn_selectable_is_terminal(self._impl)
+
+ def free(self):
+ if self._impl:
+ del self.messenger._selectables[self.fileno()]
+ pn_selectable_free(self._impl)
+ self._impl = None
+
+ def __del__(self):
+ self.free()
+
+class DataException(ProtonException):
+ """
+ The DataException class is the root of the Data exception hierarchy.
+ All exceptions raised by the Data class extend this exception.
+ """
+ pass
+
+class UnmappedType:
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __repr__(self):
+ return "UnmappedType(%s)" % self.msg
+
+class ulong(long):
+
+ def __repr__(self):
+ return "ulong(%s)" % long.__repr__(self)
+
+class timestamp(long):
+
+ def __repr__(self):
+ return "timestamp(%s)" % long.__repr__(self)
+
+class symbol(unicode):
+
+ def __repr__(self):
+ return "symbol(%s)" % unicode.__repr__(self)
+
+class char(unicode):
+
+ def __repr__(self):
+ return "char(%s)" % unicode.__repr__(self)
+
+class Described(object):
+
+ def __init__(self, descriptor, value):
+ self.descriptor = descriptor
+ self.value = value
+
+ def __repr__(self):
+ return "Described(%r, %r)" % (self.descriptor, self.value)
+
+ def __eq__(self, o):
+ if isinstance(o, Described):
+ return self.descriptor == o.descriptor and self.value == o.value
+ else:
+ return False
+
+UNDESCRIBED = Constant("UNDESCRIBED")
+
+class Array(object):
+
+ def __init__(self, descriptor, type, *elements):
+ self.descriptor = descriptor
+ self.type = type
+ self.elements = elements
+
+ def __repr__(self):
+ if self.elements:
+ els = ", %s" % (", ".join(map(repr, self.elements)))
+ else:
+ els = ""
+ return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
+
+ def __eq__(self, o):
+ if isinstance(o, Array):
+ return self.descriptor == o.descriptor and \
+ self.type == o.type and self.elements == o.elements
+ else:
+ return False
+
+class Data:
+ """
+ The L{Data} class provides an interface for decoding, extracting,
+ creating, and encoding arbitrary AMQP data. A L{Data} object
+ contains a tree of AMQP values. Leaf nodes in this tree correspond
+ to scalars in the AMQP type system such as L{ints<INT>} or
+ L{strings<STRING>}. Non-leaf nodes in this tree correspond to
+ compound values in the AMQP type system such as L{lists<LIST>},
+ L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
+ The root node of the tree is the L{Data} object itself and can have
+ an arbitrary number of children.
+
+ A L{Data} object maintains the notion of the current sibling node
+ and a current parent node. Siblings are ordered within their parent.
+ Values are accessed and/or added by using the L{next}, L{prev},
+ L{enter}, and L{exit} methods to navigate to the desired location in
+ the tree and using the supplied variety of put_*/get_* methods to
+ access or add a value of the desired type.
+
+ The put_* methods will always add a value I{after} the current node
+ in the tree. If the current node has a next sibling the put_* method
+ will overwrite the value on this node. If there is no current node
+ or the current node has no next sibling then one will be added. The
+ put_* methods always set the added/modified node to the current
+ node. The get_* methods read the value of the current node and do
+ not change which node is current.
+
+ The following types of scalar values are supported:
+
+ - L{NULL}
+ - L{BOOL}
+ - L{UBYTE}
+ - L{USHORT}
+ - L{SHORT}
+ - L{UINT}
+ - L{INT}
+ - L{ULONG}
+ - L{LONG}
+ - L{FLOAT}
+ - L{DOUBLE}
+ - L{BINARY}
+ - L{STRING}
+ - L{SYMBOL}
+
+ The following types of compound values are supported:
+
+ - L{DESCRIBED}
+ - L{ARRAY}
+ - L{LIST}
+ - L{MAP}
+ """
+
+ NULL = PN_NULL; "A null value."
+ BOOL = PN_BOOL; "A boolean value."
+ UBYTE = PN_UBYTE; "An unsigned byte value."
+ BYTE = PN_BYTE; "A signed byte value."
+ USHORT = PN_USHORT; "An unsigned short value."
+ SHORT = PN_SHORT; "A short value."
+ UINT = PN_UINT; "An unsigned int value."
+ INT = PN_INT; "A signed int value."
+ CHAR = PN_CHAR; "A character value."
+ ULONG = PN_ULONG; "An unsigned long value."
+ LONG = PN_LONG; "A signed long value."
+ TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
+ FLOAT = PN_FLOAT; "A float value."
+ DOUBLE = PN_DOUBLE; "A double value."
+ DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
+ DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
+ DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
+ UUID = PN_UUID; "A UUID value."
+ BINARY = PN_BINARY; "A binary string."
+ STRING = PN_STRING; "A unicode string."
+ SYMBOL = PN_SYMBOL; "A symbolic string."
+ DESCRIBED = PN_DESCRIBED; "A described value."
+ ARRAY = PN_ARRAY; "An array value."
+ LIST = PN_LIST; "A list value."
+ MAP = PN_MAP; "A map value."
+
+ type_names = {
+ NULL: "null",
+ BOOL: "bool",
+ BYTE: "byte",
+ UBYTE: "ubyte",
+ SHORT: "short",
+ USHORT: "ushort",
+ INT: "int",
+ UINT: "uint",
+ CHAR: "char",
+ LONG: "long",
+ ULONG: "ulong",
+ TIMESTAMP: "timestamp",
+ FLOAT: "float",
+ DOUBLE: "double",
+ DECIMAL32: "decimal32",
+ DECIMAL64: "decimal64",
+ DECIMAL128: "decimal128",
+ UUID: "uuid",
+ BINARY: "binary",
+ STRING: "string",
+ SYMBOL: "symbol",
+ DESCRIBED: "described",
+ ARRAY: "array",
+ LIST: "list",
+ MAP: "map"
+ }
+
+ @classmethod
+ def type_name(type): return Data.type_names[type]
+
+ def __init__(self, capacity=16):
+ if type(capacity) in (int, long):
+ self._data = pn_data(capacity)
+ self._free = True
+ else:
+ self._data = capacity
+ self._free = False
+
+ def __del__(self):
+ if self._free and hasattr(self, "_data"):
+ pn_data_free(self._data)
+ del self._data
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, DataException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
+ else:
+ return err
+
+ def clear(self):
+ """
+ Clears the data object.
+ """
+ pn_data_clear(self._data)
+
+ def rewind(self):
+ """
+ Clears current node and sets the parent to the root node. Clearing the
+ current node sets it _before_ the first node, calling next() will advance to
+ the first node.
+ """
+ assert self._data is not None
+ pn_data_rewind(self._data)
+
+ def next(self):
+ """
+ Advances the current node to its next sibling and returns its
+ type. If there is no next sibling the current node remains
+ unchanged and None is returned.
+ """
+ found = pn_data_next(self._data)
+ if found:
+ return self.type()
+ else:
+ return None
+
+ def prev(self):
+ """
+ Advances the current node to its previous sibling and returns its
+ type. If there is no previous sibling the current node remains
+ unchanged and None is returned.
+ """
+ found = pn_data_prev(self._data)
+ if found:
+ return self.type()
+ else:
+ return None
+
+ def enter(self):
+ """
+ Sets the parent node to the current node and clears the current node.
+ Clearing the current node sets it _before_ the first child,
+ call next() advances to the first child.
+ """
+ return pn_data_enter(self._data)
+
+ def exit(self):
+ """
+ Sets the current node to the parent node and the parent node to
+ its own parent.
+ """
+ return pn_data_exit(self._data)
+
+ def lookup(self, name):
+ return pn_data_lookup(self._data, name)
+
+ def narrow(self):
+ pn_data_narrow(self._data)
+
+ def widen(self):
+ pn_data_widen(self._data)
+
+ def type(self):
+ """
+ Returns the type of the current node.
+ """
+ dtype = pn_data_type(self._data)
+ if dtype == -1:
+ return None
+ else:
+ return dtype
+
+ def encode(self):
+ """
+ Returns a representation of the data encoded in AMQP format.
+ """
+ size = 1024
+ while True:
+ cd, enc = pn_data_encode(self._data, size)
+ if cd == PN_OVERFLOW:
+ size *= 2
+ elif cd >= 0:
+ return enc
+ else:
+ self._check(cd)
+
+ def decode(self, encoded):
+ """
+ Decodes the first value from supplied AMQP data and returns the
+ number of bytes consumed.
+
+ @type encoded: binary
+ @param encoded: AMQP encoded binary data
+ """
+ return self._check(pn_data_decode(self._data, encoded))
+
+ def put_list(self):
+ """
+ Puts a list value. Elements may be filled by entering the list
+ node and putting element values.
+
+ >>> data = Data()
+ >>> data.put_list()
+ >>> data.enter()
+ >>> data.put_int(1)
+ >>> data.put_int(2)
+ >>> data.put_int(3)
+ >>> data.exit()
+ """
+ self._check(pn_data_put_list(self._data))
+
+ def put_map(self):
+ """
+ Puts a map value. Elements may be filled by entering the map node
+ and putting alternating key value pairs.
+
+ >>> data = Data()
+ >>> data.put_map()
+ >>> data.enter()
+ >>> data.put_string("key")
+ >>> data.put_string("value")
+ >>> data.exit()
+ """
+ self._check(pn_data_put_map(self._data))
+
+ def put_array(self, described, element_type):
+ """
+ Puts an array value. Elements may be filled by entering the array
+ node and putting the element values. The values must all be of the
+ specified array element type. If an array is described then the
+ first child value of the array is the descriptor and may be of any
+ type.
+
+ >>> data = Data()
+ >>>
+ >>> data.put_array(False, Data.INT)
+ >>> data.enter()
+ >>> data.put_int(1)
+ >>> data.put_int(2)
+ >>> data.put_int(3)
+ >>> data.exit()
+ >>>
+ >>> data.put_array(True, Data.DOUBLE)
+ >>> data.enter()
+ >>> data.put_symbol("array-descriptor")
+ >>> data.put_double(1.1)
+ >>> data.put_double(1.2)
+ >>> data.put_double(1.3)
+ >>> data.exit()
+
+ @type described: bool
+ @param described: specifies whether the array is described
+ @type element_type: int
+ @param element_type: the type of the array elements
+ """
+ self._check(pn_data_put_array(self._data, described, element_type))
+
+ def put_described(self):
+ """
+ Puts a described value. A described node has two children, the
+ descriptor and the value. These are specified by entering the node
+ and putting the desired values.
+
+ >>> data = Data()
+ >>> data.put_described()
+ >>> data.enter()
+ >>> data.put_symbol("value-descriptor")
+ >>> data.put_string("the value")
+ >>> data.exit()
+ """
+ self._check(pn_data_put_described(self._data))
+
+ def put_null(self):
+ """
+ Puts a null value.
+ """
+ self._check(pn_data_put_null(self._data))
+
+ def put_bool(self, b):
+ """
+ Puts a boolean value.
+
+ @param b: a boolean value
+ """
+ self._check(pn_data_put_bool(self._data, b))
+
+ def put_ubyte(self, ub):
+ """
+ Puts an unsigned byte value.
+
+ @param ub: an integral value
+ """
+ self._check(pn_data_put_ubyte(self._data, ub))
+
+ def put_byte(self, b):
+ """
+ Puts a signed byte value.
+
+ @param b: an integral value
+ """
+ self._check(pn_data_put_byte(self._data, b))
+
+ def put_ushort(self, us):
+ """
+ Puts an unsigned short value.
+
+ @param us: an integral value.
+ """
+ self._check(pn_data_put_ushort(self._data, us))
+
+ def put_short(self, s):
+ """
+ Puts a signed short value.
+
+ @param s: an integral value
+ """
+ self._check(pn_data_put_short(self._data, s))
+
+ def put_uint(self, ui):
+ """
+ Puts an unsigned int value.
+
+ @param ui: an integral value
+ """
+ self._check(pn_data_put_uint(self._data, ui))
+
+ def put_int(self, i):
+ """
+ Puts a signed int value.
+
+ @param i: an integral value
+ """
+ self._check(pn_data_put_int(self._data, i))
+
+ def put_char(self, c):
+ """
+ Puts a char value.
+
+ @param c: a single character
+ """
+ self._check(pn_data_put_char(self._data, ord(c)))
+
+ def put_ulong(self, ul):
+ """
+ Puts an unsigned long value.
+
+ @param ul: an integral value
+ """
+ self._check(pn_data_put_ulong(self._data, ul))
+
+ def put_long(self, l):
+ """
+ Puts a signed long value.
+
+ @param l: an integral value
+ """
+ self._check(pn_data_put_long(self._data, l))
+
+ def put_timestamp(self, t):
+ """
+ Puts a timestamp value.
+
+ @param t: an integral value
+ """
+ self._check(pn_data_put_timestamp(self._data, t))
+
+ def put_float(self, f):
+ """
+ Puts a float value.
+
+ @param f: a floating point value
+ """
+ self._check(pn_data_put_float(self._data, f))
+
+ def put_double(self, d):
+ """
+ Puts a double value.
+
+ @param d: a floating point value.
+ """
+ self._check(pn_data_put_double(self._data, d))
+
+ def put_decimal32(self, d):
+ """
+ Puts a decimal32 value.
+
+ @param d: a decimal32 value
+ """
+ self._check(pn_data_put_decimal32(self._data, d))
+
+ def put_decimal64(self, d):
+ """
+ Puts a decimal64 value.
+
+ @param d: a decimal64 value
+ """
+ self._check(pn_data_put_decimal64(self._data, d))
+
+ def put_decimal128(self, d):
+ """
+ Puts a decimal128 value.
+
+ @param d: a decimal128 value
+ """
+ self._check(pn_data_put_decimal128(self._data, d))
+
+ def put_uuid(self, u):
+ """
+ Puts a UUID value.
+
+ @param u: a uuid value
+ """
+ self._check(pn_data_put_uuid(self._data, u.bytes))
+
+ def put_binary(self, b):
+ """
+ Puts a binary value.
+
+ @type b: binary
+ @param b: a binary value
+ """
+ self._check(pn_data_put_binary(self._data, b))
+
+ def put_string(self, s):
+ """
+ Puts a unicode value.
+
+ @type s: unicode
+ @param s: a unicode value
+ """
+ self._check(pn_data_put_string(self._data, s.encode("utf8")))
+
+ def put_symbol(self, s):
+ """
+ Puts a symbolic value.
+
+ @type s: string
+ @param s: the symbol name
+ """
+ self._check(pn_data_put_symbol(self._data, s))
+
+ def get_list(self):
+ """
+ If the current node is a list, return the number of elements,
+ otherwise return zero. List elements can be accessed by entering
+ the list.
+
+ >>> count = data.get_list()
+ >>> data.enter()
+ >>> for i in range(count):
+ ... type = data.next()
+ ... if type == Data.STRING:
+ ... print data.get_string()
+ ... elif type == ...:
+ ... ...
+ >>> data.exit()
+ """
+ return pn_data_get_list(self._data)
+
+ def get_map(self):
+ """
+ If the current node is a map, return the number of child elements,
+ otherwise return zero. Key value pairs can be accessed by entering
+ the map.
+
+ >>> count = data.get_map()
+ >>> data.enter()
+ >>> for i in range(count/2):
+ ... type = data.next()
+ ... if type == Data.STRING:
+ ... print data.get_string()
+ ... elif type == ...:
+ ... ...
+ >>> data.exit()
+ """
+ return pn_data_get_map(self._data)
+
+ def get_array(self):
+ """
+ If the current node is an array, return a tuple of the element
+ count, a boolean indicating whether the array is described, and
+ the type of each element, otherwise return (0, False, None). Array
+ data can be accessed by entering the array.
+
+ >>> # read an array of strings with a symbolic descriptor
+ >>> count, described, type = data.get_array()
+ >>> data.enter()
+ >>> data.next()
+ >>> print "Descriptor:", data.get_symbol()
+ >>> for i in range(count):
+ ... data.next()
+ ... print "Element:", data.get_string()
+ >>> data.exit()
+ """
+ count = pn_data_get_array(self._data)
+ described = pn_data_is_array_described(self._data)
+ type = pn_data_get_array_type(self._data)
+ if type == -1:
+ type = None
+ return count, described, type
+
+ def is_described(self):
+ """
+ Checks if the current node is a described value. The descriptor
+ and value may be accessed by entering the described value.
+
+ >>> # read a symbolically described string
+ >>> assert data.is_described() # will error if the current node is not described
+ >>> data.enter()
+ >>> print data.get_symbol()
+ >>> print data.get_string()
+ >>> data.exit()
+ """
+ return pn_data_is_described(self._data)
+
+ def is_null(self):
+ """
+ Checks if the current node is a null.
+ """
+ return pn_data_is_null(self._data)
+
+ def get_bool(self):
+ """
+ If the current node is a boolean, returns its value, returns False
+ otherwise.
+ """
+ return pn_data_get_bool(self._data)
+
+ def get_ubyte(self):
+ """
+ If the current node is an unsigned byte, returns its value,
+ returns 0 otherwise.
+ """
+ return pn_data_get_ubyte(self._data)
+
+ def get_byte(self):
+ """
+ If the current node is a signed byte, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_byte(self._data)
+
+ def get_ushort(self):
+ """
+ If the current node is an unsigned short, returns its value,
+ returns 0 otherwise.
+ """
+ return pn_data_get_ushort(self._data)
+
+ def get_short(self):
+ """
+ If the current node is a signed short, returns its value, returns
+ 0 otherwise.
+ """
+ return pn_data_get_short(self._data)
+
+ def get_uint(self):
+ """
+ If the current node is an unsigned int, returns its value, returns
+ 0 otherwise.
+ """
+ return pn_data_get_uint(self._data)
+
+ def get_int(self):
+ """
+ If the current node is a signed int, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_int(self._data)
+
+ def get_char(self):
+ """
+ If the current node is a char, returns its value, returns 0
+ otherwise.
+ """
+ return char(unichr(pn_data_get_char(self._data)))
+
+ def get_ulong(self):
+ """
+ If the current node is an unsigned long, returns its value,
+ returns 0 otherwise.
+ """
+ return ulong(pn_data_get_ulong(self._data))
+
+ def get_long(self):
+ """
+ If the current node is an signed long, returns its value, returns
+ 0 otherwise.
+ """
+ return pn_data_get_long(self._data)
+
+ def get_timestamp(self):
+ """
+ If the current node is a timestamp, returns its value, returns 0
+ otherwise.
+ """
+ return timestamp(pn_data_get_timestamp(self._data))
+
+ def get_float(self):
+ """
+ If the current node is a float, returns its value, raises 0
+ otherwise.
+ """
+ return pn_data_get_float(self._data)
+
+ def get_double(self):
+ """
+ If the current node is a double, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_double(self._data)
+
+ # XXX: need to convert
+ def get_decimal32(self):
+ """
+ If the current node is a decimal32, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_decimal32(self._data)
+
+ # XXX: need to convert
+ def get_decimal64(self):
+ """
+ If the current node is a decimal64, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_decimal64(self._data)
+
+ # XXX: need to convert
+ def get_decimal128(self):
+ """
+ If the current node is a decimal128, returns its value, returns 0
+ otherwise.
+ """
+ return pn_data_get_decimal128(self._data)
+
+ def get_uuid(self):
+ """
+ If the current node is a UUID, returns its value, returns None
+ otherwise.
+ """
+ if pn_data_type(self._data) == Data.UUID:
+ return uuid.UUID(bytes=pn_data_get_uuid(self._data))
+ else:
+ return None
+
+ def get_binary(self):
+ """
+ If the current node is binary, returns its value, returns ""
+ otherwise.
+ """
+ return pn_data_get_binary(self._data)
+
+ def get_string(self):
+ """
+ If the current node is a string, returns its value, returns ""
+ otherwise.
+ """
+ return pn_data_get_string(self._data).decode("utf8")
+
+ def get_symbol(self):
+ """
+ If the current node is a symbol, returns its value, returns ""
+ otherwise.
+ """
+ return symbol(pn_data_get_symbol(self._data))
+
+ def copy(self, src):
+ self._check(pn_data_copy(self._data, src._data))
+
+ def format(self):
+ sz = 16
+ while True:
+ err, result = pn_data_format(self._data, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return result
+
+ def dump(self):
+ pn_data_dump(self._data)
+
+ def put_dict(self, d):
+ self.put_map()
+ self.enter()
+ try:
+ for k, v in d.items():
+ self.put_object(k)
+ self.put_object(v)
+ finally:
+ self.exit()
+
+ def get_dict(self):
+ if self.enter():
+ try:
+ result = {}
+ while self.next():
+ k = self.get_object()
+ if self.next():
+ v = self.get_object()
+ else:
+ v = None
+ result[k] = v
+ finally:
+ self.exit()
+ return result
+
+ def put_sequence(self, s):
+ self.put_list()
+ self.enter()
+ try:
+ for o in s:
+ self.put_object(o)
+ finally:
+ self.exit()
+
+ def get_sequence(self):
+ if self.enter():
+ try:
+ result = []
+ while self.next():
+ result.append(self.get_object())
+ finally:
+ self.exit()
+ return result
+
+ def get_py_described(self):
+ if self.enter():
+ try:
+ self.next()
+ descriptor = self.get_object()
+ self.next()
+ value = self.get_object()
+ finally:
+ self.exit()
+ return Described(descriptor, value)
+
+ def put_py_described(self, d):
+ self.put_described()
+ self.enter()
+ try:
+ self.put_object(d.descriptor)
+ self.put_object(d.value)
+ finally:
+ self.exit()
+
+ def get_py_array(self):
+ """
+ If the current node is an array, return an Array object
+ representing the array and its contents. Otherwise return None.
+ This is a convenience wrapper around get_array, enter, etc.
+ """
+
+ count, described, type = self.get_array()
+ if type is None: return None
+ if self.enter():
+ try:
+ if described:
+ self.next()
+ descriptor = self.get_object()
+ else:
+ descriptor = UNDESCRIBED
+ elements = []
+ while self.next():
+ elements.append(self.get_object())
+ finally:
+ self.exit()
+ return Array(descriptor, type, *elements)
+
+ def put_py_array(self, a):
+ described = a.descriptor != UNDESCRIBED
+ self.put_array(described, a.type)
+ self.enter()
+ try:
+ if described:
+ self.put_object(a.descriptor)
+ for e in a.elements:
+ self.put_object(e)
+ finally:
+ self.exit()
+
+ put_mappings = {
+ None.__class__: lambda s, _: s.put_null(),
+ bool: put_bool,
+ dict: put_dict,
+ list: put_sequence,
+ tuple: put_sequence,
+ unicode: put_string,
+ bytes: put_binary,
+ symbol: put_symbol,
+ int: put_long,
+ char: put_char,
+ long: put_long,
+ ulong: put_ulong,
+ timestamp: put_timestamp,
+ float: put_double,
+ uuid.UUID: put_uuid,
+ Described: put_py_described,
+ Array: put_py_array
+ }
+ get_mappings = {
+ NULL: lambda s: None,
+ BOOL: get_bool,
+ BYTE: get_byte,
+ UBYTE: get_ubyte,
+ SHORT: get_short,
+ USHORT: get_ushort,
+ INT: get_int,
+ UINT: get_uint,
+ CHAR: get_char,
+ LONG: get_long,
+ ULONG: get_ulong,
+ TIMESTAMP: get_timestamp,
+ FLOAT: get_float,
+ DOUBLE: get_double,
+ DECIMAL32: get_decimal32,
+ DECIMAL64: get_decimal64,
+ DECIMAL128: get_decimal128,
+ UUID: get_uuid,
+ BINARY: get_binary,
+ STRING: get_string,
+ SYMBOL: get_symbol,
+ DESCRIBED: get_py_described,
+ ARRAY: get_py_array,
+ LIST: get_sequence,
+ MAP: get_dict
+ }
+
+
+ def put_object(self, obj):
+ putter = self.put_mappings[obj.__class__]
+ putter(self, obj)
+
+ def get_object(self):
+ type = self.type()
+ if type is None: return None
+ getter = self.get_mappings.get(type)
+ if getter:
+ return getter(self)
+ else:
+ return UnmappedType(str(type))
+
+class ConnectionException(ProtonException):
+ pass
+
+class Endpoint(object):
+
+ LOCAL_UNINIT = PN_LOCAL_UNINIT
+ REMOTE_UNINIT = PN_REMOTE_UNINIT
+ LOCAL_ACTIVE = PN_LOCAL_ACTIVE
+ REMOTE_ACTIVE = PN_REMOTE_ACTIVE
+ LOCAL_CLOSED = PN_LOCAL_CLOSED
+ REMOTE_CLOSED = PN_REMOTE_CLOSED
+
+ def __init__(self):
+ self.condition = None
+ self._release_invoked = False
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if not self._release_invoked:
+ for c in self._children:
+ c._release()
+ self._free_resource()
+ self.connection._releasing(self)
+ self._release_invoked = True
+
+ def _update_cond(self):
+ obj2cond(self.condition, self._get_cond_impl())
+
+ @property
+ def remote_condition(self):
+ return cond2obj(self._get_remote_cond_impl())
+
+ # the following must be provided by subclasses
+ def _get_cond_impl(self):
+ assert False, "Subclass must override this!"
+
+ def _get_remote_cond_impl(self):
+ assert False, "Subclass must override this!"
+
+class Condition:
+
+ def __init__(self, name, description=None, info=None):
+ self.name = name
+ self.description = description
+ self.info = info
+
+ def __repr__(self):
+ return "Condition(%s)" % ", ".join([repr(x) for x in
+ (self.name, self.description, self.info)
+ if x])
+
+ def __eq__(self, o):
+ if not isinstance(o, Condition): return False
+ return self.name == o.name and \
+ self.description == o.description and \
+ self.info == o.info
+
+def obj2cond(obj, cond):
+ pn_condition_clear(cond)
+ if obj:
+ pn_condition_set_name(cond, str(obj.name))
+ pn_condition_set_description(cond, obj.description)
+ info = Data(pn_condition_info(cond))
+ if obj.info:
+ info.put_object(obj.info)
+
+def cond2obj(cond):
+ if pn_condition_is_set(cond):
+ return Condition(pn_condition_get_name(cond),
+ pn_condition_get_description(cond),
+ dat2obj(pn_condition_info(cond)))
+ else:
+ return None
+
+def dat2obj(dimpl):
+ if dimpl:
+ d = Data(dimpl)
+ d.rewind()
+ d.next()
+ obj = d.get_object()
+ d.rewind()
+ return obj
+
+def obj2dat(obj, dimpl):
+ if obj is not None:
+ d = Data(dimpl)
+ d.put_object(obj)
+
+def secs2millis(secs):
+ return long(secs*1000)
+
+def millis2secs(millis):
+ return float(millis)/1000.0
+
+class Connection(Endpoint):
+
+ @staticmethod
+ def _wrap_connection(c_conn):
+ """Maintain only a single instance of this class for each Connection
+ object that exists in the the C Engine. This is done by storing a (weak)
+ reference to the python instance in the context field of the C object.
+ """
+ if not c_conn: return None
+ py_conn = pn_void2py(pn_connection_get_context(c_conn))
+ if py_conn: return py_conn
+ wrapper = Connection(_conn=c_conn)
+ return wrapper
+
+ def __init__(self, _conn=None):
+ Endpoint.__init__(self)
+ if _conn:
+ self._conn = _conn
+ else:
+ self._conn = pn_connection()
+ pn_connection_set_context(self._conn, pn_py2void(self))
+ self.offered_capabilities = None
+ self.desired_capabilities = None
+ self.properties = None
+ self._sessions = set()
+
+ def __del__(self):
+ if hasattr(self, "_conn") and self._conn:
+ self._release()
+
+ def free(self):
+ self._release()
+
+ @property
+ def _children(self):
+ return self._sessions
+
+ @property
+ def connection(self):
+ return self
+
+ def _free_resource(self):
+ pn_connection_free(self._conn)
+
+ def _released(self):
+ self._conn = None
+
+ def _releasing(self, child):
+ coll = getattr(self, "_collector", None)
+ if coll: coll = coll()
+ if coll:
+ coll._contexts.add(child)
+ else:
+ child._released()
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, ConnectionException)
+ raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
+ else:
+ return err
+
+ def _get_cond_impl(self):
+ return pn_connection_condition(self._conn)
+
+ def _get_remote_cond_impl(self):
+ return pn_connection_remote_condition(self._conn)
+
+ def collect(self, collector):
+ if collector is None:
+ pn_connection_collect(self._conn, None)
+ else:
+ pn_connection_collect(self._conn, collector._impl)
+ self._collector = weakref.ref(collector)
+
+ def _get_container(self):
+ return pn_connection_get_container(self._conn)
+ def _set_container(self, name):
+ return pn_connection_set_container(self._conn, name)
+
+ container = property(_get_container, _set_container)
+
+ def _get_hostname(self):
+ return pn_connection_get_hostname(self._conn)
+ def _set_hostname(self, name):
+ return pn_connection_set_hostname(self._conn, name)
+
+ hostname = property(_get_hostname, _set_hostname)
+
+ @property
+ def remote_container(self):
+ return pn_connection_remote_container(self._conn)
+
+ @property
+ def remote_hostname(self):
+ return pn_connection_remote_hostname(self._conn)
+
+ @property
+ def remote_offered_capabilities(self):
+ return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
+
+ @property
+ def remote_desired_capabilities(self):
+ return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
+
+ @property
+ def remote_properties(self):
+ return dat2obj(pn_connection_remote_properties(self._conn))
+
+ def open(self):
+ obj2dat(self.offered_capabilities,
+ pn_connection_offered_capabilities(self._conn))
+ obj2dat(self.desired_capabilities,
+ pn_connection_desired_capabilities(self._conn))
+ obj2dat(self.properties, pn_connection_properties(self._conn))
+ pn_connection_open(self._conn)
+
+ def close(self):
+ self._update_cond()
+ pn_connection_close(self._conn)
+
+ @property
+ def state(self):
+ return pn_connection_state(self._conn)
+
+ def session(self):
+ return Session._wrap_session(pn_session(self._conn))
+
+ def session_head(self, mask):
+ return Session._wrap_session(pn_session_head(self._conn, mask))
+
+ def link_head(self, mask):
+ return Link._wrap_link(pn_link_head(self._conn, mask))
+
+ @property
+ def work_head(self):
+ return Delivery._wrap_delivery(pn_work_head(self._conn))
+
+ @property
+ def error(self):
+ return pn_error_code(pn_connection_error(self._conn))
+
+class SessionException(ProtonException):
+ pass
+
+class Session(Endpoint):
+
+ @staticmethod
+ def _wrap_session(c_ssn):
+ """Maintain only a single instance of this class for each Session object that
+ exists in the C Engine.
+ """
+ if c_ssn is None: return None
+ py_ssn = pn_void2py(pn_session_get_context(c_ssn))
+ if py_ssn: return py_ssn
+ wrapper = Session(c_ssn)
+ return wrapper
+
+ def __init__(self, ssn):
+ Endpoint.__init__(self)
+ self._ssn = ssn
+ pn_session_set_context(self._ssn, pn_py2void(self))
+ self._links = set()
+ self.connection._sessions.add(self)
+
+ @property
+ def _children(self):
+ return self._links
+
+ def _free_resource(self):
+ pn_session_free(self._ssn)
+
+ def _released(self):
+ self._ssn = None
+
+ def free(self):
+ """Release the Session, freeing its resources.
+
+ Call this when you no longer need the session. This will allow the
+ session's resources to be reclaimed. Once called, you should no longer
+ reference the session.
+
+ """
+ self.connection._sessions.remove(self)
+ self._release()
+
+ def _get_cond_impl(self):
+ return pn_session_condition(self._ssn)
+
+ def _get_remote_cond_impl(self):
+ return pn_session_remote_condition(self._ssn)
+
+ def _get_incoming_capacity(self):
+ return pn_session_get_incoming_capacity(self._ssn)
+
+ def _set_incoming_capacity(self, capacity):
+ pn_session_set_incoming_capacity(self._ssn, capacity)
+
+ incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
+
+ @property
+ def outgoing_bytes(self):
+ return pn_session_outgoing_bytes(self._ssn)
+
+ @property
+ def incoming_bytes(self):
+ return pn_session_incoming_bytes(self._ssn)
+
+ def open(self):
+ pn_session_open(self._ssn)
+
+ def close(self):
+ self._update_cond()
+ pn_session_close(self._ssn)
+
+ def next(self, mask):
+ return Session._wrap_session(pn_session_next(self._ssn, mask))
+
+ @property
+ def state(self):
+ return pn_session_state(self._ssn)
+
+ @property
+ def connection(self):
+ return Connection._wrap_connection(pn_session_connection(self._ssn))
+
+ def sender(self, name):
+ return Link._wrap_link(pn_sender(self._ssn, name))
+
+ def receiver(self, name):
+ return Link._wrap_link(pn_receiver(self._ssn, name))
+
+class LinkException(ProtonException):
+ pass
+
+class Link(Endpoint):
+
+ SND_UNSETTLED = PN_SND_UNSETTLED
+ SND_SETTLED = PN_SND_SETTLED
+ SND_MIXED = PN_SND_MIXED
+
+ RCV_FIRST = PN_RCV_FIRST
+ RCV_SECOND = PN_RCV_SECOND
+
+ @staticmethod
+ def _wrap_link(c_link):
+ """Maintain only a single instance of this class for each Session object that
+ exists in the C Engine.
+ """
+ if c_link is None: return None
+ py_link = pn_void2py(pn_link_get_context(c_link))
+ if py_link: return py_link
+ if pn_link_is_sender(c_link):
+ wrapper = Sender(c_link)
+ else:
+ wrapper = Receiver(c_link)
+ return wrapper
+
+ def __init__(self, c_link):
+ Endpoint.__init__(self)
+ self._link = c_link
+ pn_link_set_context(self._link, pn_py2void(self))
+ self._deliveries = set()
+ self.session._links.add(self)
+
+ @property
+ def _children(self):
+ return self._deliveries
+
+ def _free_resource(self):
+ pn_link_free(self._link)
+
+ def _released(self):
+ self._link = None
+
+ def free(self):
+ """Release the Link, freeing its resources"""
+ self.session._links.remove(self)
+ self._release()
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, LinkException)
+ raise exc("[%s]: %s" % (err, pn_link_error(self._link)))
+ else:
+ return err
+
+ def _get_cond_impl(self):
+ return pn_link_condition(self._link)
+
+ def _get_remote_cond_impl(self):
+ return pn_link_remote_condition(self._link)
+
+ def open(self):
+ pn_link_open(self._link)
+
+ def close(self):
+ self._update_cond()
+ pn_link_close(self._link)
+
+ @property
+ def state(self):
+ return pn_link_state(self._link)
+
+ @property
+ def source(self):
+ return Terminus(pn_link_source(self._link))
+
+ @property
+ def target(self):
+ return Terminus(pn_link_target(self._link))
+
+ @property
+ def remote_source(self):
+ return Terminus(pn_link_remote_source(self._link))
+ @property
+ def remote_target(self):
+ return Terminus(pn_link_remote_target(self._link))
+
+ @property
+ def session(self):
+ return Session._wrap_session(pn_link_session(self._link))
+
+ @property
+ def connection(self):
+ return self.session.connection
+
+ def delivery(self, tag):
+ return Delivery._wrap_delivery(pn_delivery(self._link, tag))
+
+ @property
+ def current(self):
+ return Delivery._wrap_delivery(pn_link_current(self._link))
+
+ def advance(self):
+ return pn_link_advance(self._link)
+
+ @property
+ def unsettled(self):
+ return pn_link_unsettled(self._link)
+
+ @property
+ def credit(self):
+ return pn_link_credit(self._link)
+
+ @property
+ def available(self):
+ return pn_link_available(self._link)
+
+ @property
+ def queued(self):
+ return pn_link_queued(self._link)
+
+ def next(self, mask):
+ return Link._wrap_link(pn_link_next(self._link, mask))
+
+ @property
+ def name(self):
+ return pn_link_name(self._link)
+
+ @property
+ def is_sender(self):
+ return pn_link_is_sender(self._link)
+
+ @property
+ def is_receiver(self):
+ return pn_link_is_receiver(self._link)
+
+ @property
+ def remote_snd_settle_mode(self):
+ return pn_link_remote_snd_settle_mode(self._link)
+
+ @property
+ def remote_rcv_settle_mode(self):
+ return pn_link_remote_rcv_settle_mode(self._link)
+
+ def _get_snd_settle_mode(self):
+ return pn_link_snd_settle_mode(self._link)
+ def _set_snd_settle_mode(self, mode):
+ pn_link_set_snd_settle_mode(self._link, mode)
+ snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
+
+ def _get_rcv_settle_mode(self):
+ return pn_link_rcv_settle_mode(self._link)
+ def _set_rcv_settle_mode(self, mode):
+ pn_link_set_rcv_settle_mode(self._link, mode)
+ rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
+
+ def drained(self):
+ return pn_link_drained(self._link)
+
+ def detach(self):
+ return pn_link_detach(self._link)
+
+class Terminus(object):
+
+ UNSPECIFIED = PN_UNSPECIFIED
+ SOURCE = PN_SOURCE
+ TARGET = PN_TARGET
+ COORDINATOR = PN_COORDINATOR
+
+ NONDURABLE = PN_NONDURABLE
+ CONFIGURATION = PN_CONFIGURATION
+ DELIVERIES = PN_DELIVERIES
+
+ DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
+ DIST_MODE_COPY = PN_DIST_MODE_COPY
+ DIST_MODE_MOVE = PN_DIST_MODE_MOVE
+
+ def __init__(self, impl):
+ self._impl = impl
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, LinkException)
+ raise exc("[%s]" % err)
+ else:
+ return err
+
+ def _get_type(self):
+ return pn_terminus_get_type(self._impl)
+ def _set_type(self, type):
+ self._check(pn_terminus_set_type(self._impl, type))
+ type = property(_get_type, _set_type)
+
+ def _get_address(self):
+ return pn_terminus_get_address(self._impl)
+ def _set_address(self, address):
+ self._check(pn_terminus_set_address(self._impl, address))
+ address = property(_get_address, _set_address)
+
+ def _get_durability(self):
+ return pn_terminus_get_durability(self._impl)
+ def _set_durability(self, seconds):
+ self._check(pn_terminus_set_durability(self._impl, seconds))
+ durability = property(_get_durability, _set_durability)
+
+ def _get_expiry_policy(self):
+ return pn_terminus_get_expiry_policy(self._impl)
+ def _set_expiry_policy(self, seconds):
+ self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
+ expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
+
+ def _get_timeout(self):
+ return pn_terminus_get_timeout(self._impl)
+ def _set_timeout(self, seconds):
+ self._check(pn_terminus_set_timeout(self._impl, seconds))
+ timeout = property(_get_timeout, _set_timeout)
+
+ def _is_dynamic(self):
+ return pn_terminus_is_dynamic(self._impl)
+ def _set_dynamic(self, dynamic):
+ self._check(pn_terminus_set_dynamic(self._impl, dynamic))
+ dynamic = property(_is_dynamic, _set_dynamic)
+
+ def _get_distribution_mode(self):
+ return pn_terminus_get_distribution_mode(self._impl)
+ def _set_distribution_mode(self, mode):
+ self._check(pn_terminus_set_distribution_mode(self._impl, mode))
+ distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
+
+ @property
+ def properties(self):
+ return Data(pn_terminus_properties(self._impl))
+
+ @property
+ def capabilities(self):
+ return Data(pn_terminus_capabilities(self._impl))
+
+ @property
+ def outcomes(self):
+ return Data(pn_terminus_outcomes(self._impl))
+
+ @property
+ def filter(self):
+ return Data(pn_terminus_filter(self._impl))
+
+ def copy(self, src):
+ self._check(pn_terminus_copy(self._impl, src._impl))
+
+class Sender(Link):
+
+ def __init__(self, c_link):
+ super(Sender, self).__init__(c_link)
+
+ def offered(self, n):
+ pn_link_offered(self._link, n)
+
+ def send(self, bytes):
+ return self._check(pn_link_send(self._link, bytes))
+
+class Receiver(Link):
+
+ def __init__(self, c_link):
+ super(Receiver, self).__init__(c_link)
+
+ def flow(self, n):
+ pn_link_flow(self._link, n)
+
+ def recv(self, limit):
+ n, bytes = pn_link_recv(self._link, limit)
+ if n == PN_EOS:
+ return None
+ else:
+ self._check(n)
+ return bytes
+
+ def drain(self, n):
+ pn_link_drain(self._link, n)
+
+ def draining(self):
+ return pn_link_draining(self._link)
+
+class NamedInt(int):
+
+ values = {}
+
+ def __new__(cls, i, name):
+ ni = super(NamedInt, cls).__new__(cls, i)
+ cls.values[i] = ni
+ return ni
+
+ def __init__(self, i, name):
+ self.name = name
+
+ def __repr__(self):
+ return self.name
+
+ def __str__(self):
+ return self.name
+
+ @classmethod
+ def get(cls, i):
+ return cls.values.get(i, i)
+
+class DispositionType(NamedInt):
+ values = {}
+
+class Disposition(object):
+
+ RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
+ ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
+ REJECTED = DispositionType(PN_REJECTED, "REJECTED")
+ RELEASED = DispositionType(PN_RELEASED, "RELEASED")
+ MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
+
+ def __init__(self, impl, local):
+ self._impl = impl
+ self.local = local
+ self._data = None
+ self._condition = None
+ self._annotations = None
+
+ @property
+ def type(self):
+ return DispositionType.get(pn_disposition_type(self._impl))
+
+ def _get_section_number(self):
+ return pn_disposition_get_section_number(self._impl)
+ def _set_section_number(self, n):
+ pn_disposition_set_section_number(self._impl, n)
+ section_number = property(_get_section_number, _set_section_number)
+
+ def _get_section_offset(self):
+ return pn_disposition_get_section_offset(self._impl)
+ def _set_section_offset(self, n):
+ pn_disposition_set_section_offset(self._impl, n)
+ section_offset = property(_get_section_offset, _set_section_offset)
+
+ def _get_failed(self):
+ return pn_disposition_is_failed(self._impl)
+ def _set_failed(self, b):
+ pn_disposition_set_failed(self._impl, b)
+ failed = property(_get_failed, _set_failed)
+
+ def _get_undeliverable(self):
+ return pn_disposition_is_undeliverable(self._impl)
+ def _set_undeliverable(self, b):
+ pn_disposition_set_undeliverable(self._impl, b)
+ undeliverable = property(_get_undeliverable, _set_undeliverable)
+
+ def _get_data(self):
+ if self.local:
+ return self._data
+ else:
+ return dat2obj(pn_disposition_data(self._impl))
+ def _set_data(self, obj):
+ if self.local:
+ self._data = obj
+ else:
+ raise AttributeError("data attribute is read-only")
+ data = property(_get_data, _set_data)
+
+ def _get_annotations(self):
+ if self.local:
+ return self._annotations
+ else:
+ return dat2obj(pn_disposition_annotations(self._impl))
+ def _set_annotations(self, obj):
+ if self.local:
+ self._annotations = obj
+ else:
+ raise AttributeError("annotations attribute is read-only")
+ annotations = property(_get_annotations, _set_annotations)
+
+ def _get_condition(self):
+ if self.local:
+ return self._condition
+ else:
+ return cond2obj(pn_disposition_condition(self._impl))
+ def _set_condition(self, obj):
+ if self.local:
+ self._condition = obj
+ else:
+ raise AttributeError("condition attribute is read-only")
+ condition = property(_get_condition, _set_condition)
+
+class Delivery(object):
+
+ RECEIVED = Disposition.RECEIVED
+ ACCEPTED = Disposition.ACCEPTED
+ REJECTED = Disposition.REJECTED
+ RELEASED = Disposition.RELEASED
+ MODIFIED = Disposition.MODIFIED
+
+ @staticmethod
+ def _wrap_delivery(c_dlv):
+ """Maintain only a single instance of this class for each Delivery object that
+ exists in the C Engine.
+ """
+ if not c_dlv: return None
+ py_dlv = pn_void2py(pn_delivery_get_context(c_dlv))
+ if py_dlv: return py_dlv
+ wrapper = Delivery(c_dlv)
+ return wrapper
+
+ def __init__(self, dlv):
+ self._dlv = dlv
+ pn_delivery_set_context(self._dlv, pn_py2void(self))
+ self.local = Disposition(pn_delivery_local(self._dlv), True)
+ self.remote = Disposition(pn_delivery_remote(self._dlv), False)
+ self.link._deliveries.add(self)
+
+ def __del__(self):
+ self._release()
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._dlv:
+ pn_delivery_set_context(self._dlv, pn_py2void(None))
+ pn_delivery_settle(self._dlv)
+ self._dlv = None
+
+ @property
+ def released(self):
+ return self._dlv is None
+
+ @property
+ def tag(self):
+ return pn_delivery_tag(self._dlv)
+
+ @property
+ def writable(self):
+ return pn_delivery_writable(self._dlv)
+
+ @property
+ def readable(self):
+ return pn_delivery_readable(self._dlv)
+
+ @property
+ def updated(self):
+ return pn_delivery_updated(self._dlv)
+
+ def update(self, state):
+ obj2dat(self.local._data, pn_disposition_data(self.local._impl))
+ obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
+ obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
+ pn_delivery_update(self._dlv, state)
+
+ @property
+ def pending(self):
+ return pn_delivery_pending(self._dlv)
+
+ @property
+ def partial(self):
+ return pn_delivery_partial(self._dlv)
+
+ @property
+ def local_state(self):
+ return DispositionType.get(pn_delivery_local_state(self._dlv))
+
+ @property
+ def remote_state(self):
+ return DispositionType.get(pn_delivery_remote_state(self._dlv))
+
+ @property
+ def settled(self):
+ return pn_delivery_settled(self._dlv)
+
+ def settle(self):
+ """Release the delivery"""
+ self.link._deliveries.remove(self)
+ self._release()
+
+ @property
+ def work_next(self):
+ return Delivery._wrap_delivery(pn_work_next(self._dlv))
+
+ @property
+ def link(self):
+ return Link._wrap_link(pn_delivery_link(self._dlv))
+
+class TransportException(ProtonException):
+ pass
+
+class Transport(object):
+
+ TRACE_OFF = PN_TRACE_OFF
+ TRACE_DRV = PN_TRACE_DRV
+ TRACE_FRM = PN_TRACE_FRM
+ TRACE_RAW = PN_TRACE_RAW
+
+ CLIENT = 1
+ SERVER = 2
+
+ @staticmethod
+ def _wrap_transport(c_trans):
+ if not c_trans: return None
+ wrapper = Transport(_trans=c_trans)
+ return wrapper
+
+ def __init__(self, mode=None, _trans=None):
+ if not mode and not _trans:
+ self._trans = pn_transport()
+ elif not mode:
+ self._shared_trans = True
+ self._trans = _trans
+ elif mode==Transport.CLIENT:
+ self._trans = pn_transport()
+ elif mode==Transport.SERVER:
+ self._trans = pn_transport()
+ pn_transport_set_server(self._trans)
+ else:
+ raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
+ self._sasl = None
+ self._ssl = None
+
+ def __del__(self):
+ if hasattr(self, "_trans"):
+ if not hasattr(self, "_shared_trans"):
+ pn_transport_free(self._trans)
+ if hasattr(self, "_sasl") and self._sasl:
+ # pn_transport_free deallocs the C sasl associated with the
+ # transport, so erase the reference if a SASL object was used.
+ self._sasl._sasl = None
+ self._sasl = None
+ if hasattr(self, "_ssl") and self._ssl:
+ # ditto the owned c SSL object
+ self._ssl._ssl = None
+ self._ssl = None
+ del self._trans
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, TransportException)
+ raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
+ else:
+ return err
+
+ def bind(self, connection):
+ """Assign a connection to the transport"""
+ self._check(pn_transport_bind(self._trans, connection._conn))
+ # keep python connection from being garbage collected:
+ self._connection = connection
+
+ def unbind(self):
+ """Release the connection"""
+ self._check(pn_transport_unbind(self._trans))
+ self._connection = None
+
+ def trace(self, n):
+ pn_transport_trace(self._trans, n)
+
+ def tick(self, now):
+ """Process any timed events (like heartbeat generation).
+ now = seconds since epoch (float).
+ """
+ return millis2secs(pn_transport_tick(self._trans, secs2millis(now)))
+
+ def capacity(self):
+ c = pn_transport_capacity(self._trans)
+ if c >= PN_EOS:
+ return c
+ else:
+ return self._check(c)
+
+ def push(self, bytes):
+ n = self._check(pn_transport_push(self._trans, bytes))
+ if n != len(bytes):
+ raise OverflowError("unable to process all bytes")
+
+ def close_tail(self):
+ self._check(pn_transport_close_tail(self._trans))
+
+ def pending(self):
+ p = pn_transport_pending(self._trans)
+ if p >= PN_EOS:
+ return p
+ else:
+ return self._check(p)
+
+ def peek(self, size):
+ cd, out = pn_transport_peek(self._trans, size)
+ if cd == PN_EOS:
+ return None
+ else:
+ self._check(cd)
+ return out
+
+ def pop(self, size):
+ pn_transport_pop(self._trans, size)
+
+ def close_head(self):
+ self._check(pn_transport_close_head(self._trans))
+
+ @property
+ def closed(self):
+ return pn_transport_closed(self._trans)
+
+ # AMQP 1.0 max-frame-size
+ def _get_max_frame_size(self):
+ return pn_transport_get_max_frame(self._tra
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org