You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/04 15:57:36 UTC

[04/41] qpid-proton git commit: PROTON-1850: Split up proton __init__.py into multiple files - Reformatted python source to (mostly) PEP-8 standards - Control what gets exported from __init__ by restricting what it imports - Move most of the reactor impl

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_exceptions.py
----------------------------------------------------------------------
diff --git a/python/proton/_exceptions.py b/python/proton/_exceptions.py
new file mode 100644
index 0000000..47420c2
--- /dev/null
+++ b/python/proton/_exceptions.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_TIMEOUT, PN_INTR
+
+
+class ProtonException(Exception):
+    """
+    The root of the proton exception hierarchy. All proton exception
+    classes derive from this exception.
+    """
+    pass
+
+
+class Timeout(ProtonException):
+    """
+    A timeout exception indicates that a blocking operation has timed
+    out.
+    """
+    pass
+
+
+class Interrupt(ProtonException):
+    """
+    An interrupt exception indicates that a blocking operation was interrupted.
+    """
+    pass
+
+
+EXCEPTIONS = {
+    PN_TIMEOUT: Timeout,
+    PN_INTR: Interrupt
+}
+
+
+class MessageException(ProtonException):
+    """
+    The MessageException class is the root of the message exception
+    hierarchy. All exceptions generated by the Message class derive from
+    this exception.
+    """
+    pass
+
+
+class DataException(ProtonException):
+    """
+    The DataException class is the root of the Data exception hierarchy.
+    All exceptions raised by the Data class extend this exception.
+    """
+    pass
+
+
+class TransportException(ProtonException):
+    pass
+
+
+class SSLException(TransportException):
+    pass
+
+
+class SSLUnavailable(SSLException):
+    pass
+
+
+class ConnectionException(ProtonException):
+    pass
+
+
+class SessionException(ProtonException):
+    pass
+
+
+class LinkException(ProtonException):
+    pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_message.py
----------------------------------------------------------------------
diff --git a/python/proton/_message.py b/python/proton/_message.py
new file mode 100644
index 0000000..32a8c72
--- /dev/null
+++ b/python/proton/_message.py
@@ -0,0 +1,465 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_STATUS_SETTLED, PN_DEFAULT_PRIORITY, PN_STATUS_MODIFIED, PN_STATUS_RELEASED, PN_STATUS_ABORTED, \
+    PN_STATUS_REJECTED, PN_STATUS_PENDING, PN_STATUS_UNKNOWN, PN_STATUS_ACCEPTED, \
+    PN_OVERFLOW, \
+    pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \
+    pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \
+    pn_message_get_content_encoding, pn_message_body, \
+    pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \
+    pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \
+    pn_message_is_first_acquirer, pn_message_set_priority, \
+    pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \
+    pn_message_set_user_id, pn_message_set_group_id, \
+    pn_message_id, pn_message_clear, pn_message_set_durable, \
+    pn_message_set_first_acquirer, pn_message_get_delivery_count, \
+    pn_message_decode, pn_message_set_reply_to_group_id, \
+    pn_message_get_group_sequence, pn_message_set_reply_to, \
+    pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \
+    pn_message_instructions, pn_message_get_content_type, \
+    pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \
+    pn_message_set_group_sequence, pn_message_set_inferred, \
+    pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
+
+from . import _compat
+from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._data import Data, ulong, symbol
+from ._endpoints import Link
+from ._exceptions import EXCEPTIONS, MessageException
+
+PENDING = Constant("PENDING")
+ACCEPTED = Constant("ACCEPTED")
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
+MODIFIED = Constant("MODIFIED")
+ABORTED = Constant("ABORTED")
+SETTLED = Constant("SETTLED")
+
+STATUSES = {
+    PN_STATUS_ABORTED: ABORTED,
+    PN_STATUS_ACCEPTED: ACCEPTED,
+    PN_STATUS_REJECTED: REJECTED,
+    PN_STATUS_RELEASED: RELEASED,
+    PN_STATUS_MODIFIED: MODIFIED,
+    PN_STATUS_PENDING: PENDING,
+    PN_STATUS_SETTLED: SETTLED,
+    PN_STATUS_UNKNOWN: None
+}
+
+
+class Message(object):
+    """The L{Message} class is a mutable holder of message content.
+
+    @ivar instructions: delivery instructions for the message
+    @type instructions: dict
+    @ivar annotations: infrastructure defined message annotations
+    @type annotations: dict
+    @ivar properties: application defined message properties
+    @type properties: dict
+    @ivar body: message body
+    @type body: bytes | unicode | dict | list | int | long | float | UUID
+    """
+
+    DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
+
+    def __init__(self, body=None, **kwargs):
+        """
+        @param kwargs: Message property name/value pairs to initialise the Message
+        """
+        self._msg = pn_message()
+        self._id = Data(pn_message_id(self._msg))
+        self._correlation_id = Data(pn_message_correlation_id(self._msg))
+        self.instructions = None
+        self.annotations = None
+        self.properties = None
+        self.body = body
+        for k, v in _compat.iteritems(kwargs):
+            getattr(self, k)  # Raise exception if it's not a valid attribute.
+            setattr(self, k, v)
+
+    def __del__(self):
+        if hasattr(self, "_msg"):
+            pn_message_free(self._msg)
+            del self._msg
+
+    def _check(self, err):
+        if err < 0:
+            exc = EXCEPTIONS.get(err, MessageException)
+            raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
+        else:
+            return err
+
+    def _check_property_keys(self):
+        for k in self.properties.keys():
+            if isinstance(k, unicode):
+                # py2 unicode, py3 str (via hack definition)
+                continue
+            # If key is binary then change to string
+            elif isinstance(k, str):
+                # py2 str
+                self.properties[k.encode('utf-8')] = self.properties.pop(k)
+            else:
+                raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
+
+    def _pre_encode(self):
+        inst = Data(pn_message_instructions(self._msg))
+        ann = Data(pn_message_annotations(self._msg))
+        props = Data(pn_message_properties(self._msg))
+        body = Data(pn_message_body(self._msg))
+
+        inst.clear()
+        if self.instructions is not None:
+            inst.put_object(self.instructions)
+        ann.clear()
+        if self.annotations is not None:
+            ann.put_object(self.annotations)
+        props.clear()
+        if self.properties is not None:
+            self._check_property_keys()
+            props.put_object(self.properties)
+        body.clear()
+        if self.body is not None:
+            body.put_object(self.body)
+
+    def _post_decode(self):
+        inst = Data(pn_message_instructions(self._msg))
+        ann = Data(pn_message_annotations(self._msg))
+        props = Data(pn_message_properties(self._msg))
+        body = Data(pn_message_body(self._msg))
+
+        if inst.next():
+            self.instructions = inst.get_object()
+        else:
+            self.instructions = None
+        if ann.next():
+            self.annotations = ann.get_object()
+        else:
+            self.annotations = None
+        if props.next():
+            self.properties = props.get_object()
+        else:
+            self.properties = None
+        if body.next():
+            self.body = body.get_object()
+        else:
+            self.body = None
+
+    def clear(self):
+        """
+        Clears the contents of the L{Message}. All fields will be reset to
+        their default values.
+        """
+        pn_message_clear(self._msg)
+        self.instructions = None
+        self.annotations = None
+        self.properties = None
+        self.body = None
+
+    def _is_inferred(self):
+        return pn_message_is_inferred(self._msg)
+
+    def _set_inferred(self, value):
+        self._check(pn_message_set_inferred(self._msg, bool(value)))
+
+    inferred = property(_is_inferred, _set_inferred, doc="""
+The inferred flag for a message indicates how the message content
+is encoded into AMQP sections. If inferred is true then binary and
+list values in the body of the message will be encoded as AMQP DATA
+and AMQP SEQUENCE sections, respectively. If inferred is false,
+then all values in the body of the message will be encoded as AMQP
+VALUE sections regardless of their type.
+""")
+
+    def _is_durable(self):
+        return pn_message_is_durable(self._msg)
+
+    def _set_durable(self, value):
+        self._check(pn_message_set_durable(self._msg, bool(value)))
+
+    durable = property(_is_durable, _set_durable,
+                       doc="""
+The durable property indicates that the message should be held durably
+by any intermediaries taking responsibility for the message.
+""")
+
+    def _get_priority(self):
+        return pn_message_get_priority(self._msg)
+
+    def _set_priority(self, value):
+        self._check(pn_message_set_priority(self._msg, value))
+
+    priority = property(_get_priority, _set_priority,
+                        doc="""
+The priority of the message.
+""")
+
+    def _get_ttl(self):
+        return millis2secs(pn_message_get_ttl(self._msg))
+
+    def _set_ttl(self, value):
+        self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
+
+    ttl = property(_get_ttl, _set_ttl,
+                   doc="""
+The time to live of the message measured in seconds. Expired messages
+may be dropped.
+""")
+
+    def _is_first_acquirer(self):
+        return pn_message_is_first_acquirer(self._msg)
+
+    def _set_first_acquirer(self, value):
+        self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
+
+    first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
+                              doc="""
+True iff the recipient is the first to acquire the message.
+""")
+
+    def _get_delivery_count(self):
+        return pn_message_get_delivery_count(self._msg)
+
+    def _set_delivery_count(self, value):
+        self._check(pn_message_set_delivery_count(self._msg, value))
+
+    delivery_count = property(_get_delivery_count, _set_delivery_count,
+                              doc="""
+The number of delivery attempts made for this message.
+""")
+
+    def _get_id(self):
+        return self._id.get_object()
+
+    def _set_id(self, value):
+        if isinteger(value):
+            value = ulong(value)
+        self._id.rewind()
+        self._id.put_object(value)
+
+    id = property(_get_id, _set_id,
+                  doc="""
+The id of the message.
+""")
+
+    def _get_user_id(self):
+        return pn_message_get_user_id(self._msg)
+
+    def _set_user_id(self, value):
+        self._check(pn_message_set_user_id(self._msg, value))
+
+    user_id = property(_get_user_id, _set_user_id,
+                       doc="""
+The user id of the message creator.
+""")
+
+    def _get_address(self):
+        return utf82unicode(pn_message_get_address(self._msg))
+
+    def _set_address(self, value):
+        self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
+
+    address = property(_get_address, _set_address,
+                       doc="""
+The address of the message.
+""")
+
+    def _get_subject(self):
+        return utf82unicode(pn_message_get_subject(self._msg))
+
+    def _set_subject(self, value):
+        self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
+
+    subject = property(_get_subject, _set_subject,
+                       doc="""
+The subject of the message.
+""")
+
+    def _get_reply_to(self):
+        return utf82unicode(pn_message_get_reply_to(self._msg))
+
+    def _set_reply_to(self, value):
+        self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
+
+    reply_to = property(_get_reply_to, _set_reply_to,
+                        doc="""
+The reply-to address for the message.
+""")
+
+    def _get_correlation_id(self):
+        return self._correlation_id.get_object()
+
+    def _set_correlation_id(self, value):
+        if isinteger(value):
+            value = ulong(value)
+        self._correlation_id.rewind()
+        self._correlation_id.put_object(value)
+
+    correlation_id = property(_get_correlation_id, _set_correlation_id,
+                              doc="""
+The correlation-id for the message.
+""")
+
+    def _get_content_type(self):
+        return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
+
+    def _set_content_type(self, value):
+        self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
+
+    content_type = property(_get_content_type, _set_content_type,
+                            doc="""
+The content-type of the message.
+""")
+
+    def _get_content_encoding(self):
+        return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
+
+    def _set_content_encoding(self, value):
+        self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
+
+    content_encoding = property(_get_content_encoding, _set_content_encoding,
+                                doc="""
+The content-encoding of the message.
+""")
+
+    def _get_expiry_time(self):
+        return millis2secs(pn_message_get_expiry_time(self._msg))
+
+    def _set_expiry_time(self, value):
+        self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
+
+    expiry_time = property(_get_expiry_time, _set_expiry_time,
+                           doc="""
+The expiry time of the message.
+""")
+
+    def _get_creation_time(self):
+        return millis2secs(pn_message_get_creation_time(self._msg))
+
+    def _set_creation_time(self, value):
+        self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
+
+    creation_time = property(_get_creation_time, _set_creation_time,
+                             doc="""
+The creation time of the message.
+""")
+
+    def _get_group_id(self):
+        return utf82unicode(pn_message_get_group_id(self._msg))
+
+    def _set_group_id(self, value):
+        self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
+
+    group_id = property(_get_group_id, _set_group_id,
+                        doc="""
+The group id of the message.
+""")
+
+    def _get_group_sequence(self):
+        return pn_message_get_group_sequence(self._msg)
+
+    def _set_group_sequence(self, value):
+        self._check(pn_message_set_group_sequence(self._msg, value))
+
+    group_sequence = property(_get_group_sequence, _set_group_sequence,
+                              doc="""
+The sequence of the message within its group.
+""")
+
+    def _get_reply_to_group_id(self):
+        return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
+
+    def _set_reply_to_group_id(self, value):
+        self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
+
+    reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
+                                 doc="""
+The group-id for any replies.
+""")
+
+    def encode(self):
+        self._pre_encode()
+        sz = 16
+        while True:
+            err, data = pn_message_encode(self._msg, sz)
+            if err == PN_OVERFLOW:
+                sz *= 2
+                continue
+            else:
+                self._check(err)
+                return data
+
+    def decode(self, data):
+        self._check(pn_message_decode(self._msg, data))
+        self._post_decode()
+
+    def send(self, sender, tag=None):
+        dlv = sender.delivery(tag or sender.delivery_tag())
+        encoded = self.encode()
+        sender.stream(encoded)
+        sender.advance()
+        if sender.snd_settle_mode == Link.SND_SETTLED:
+            dlv.settle()
+        return dlv
+
+    def recv(self, link):
+        """
+        Receives and decodes the message content for the current delivery
+        from the link. Upon success it will return the current delivery
+        for the link. If there is no current delivery, or if the current
+        delivery is incomplete, or if the link is not a receiver, it will
+        return None.
+
+        @type link: Link
+        @param link: the link to receive a message from
+        @return the delivery associated with the decoded message (or None)
+
+        """
+        if link.is_sender: return None
+        dlv = link.current
+        if not dlv or dlv.partial: return None
+        dlv.encoded = link.recv(dlv.pending)
+        link.advance()
+        # the sender has already forgotten about the delivery, so we might
+        # as well too
+        if link.remote_snd_settle_mode == Link.SND_SETTLED:
+            dlv.settle()
+        self.decode(dlv.encoded)
+        return dlv
+
+    def __repr2__(self):
+        props = []
+        for attr in ("inferred", "address", "reply_to", "durable", "ttl",
+                     "priority", "first_acquirer", "delivery_count", "id",
+                     "correlation_id", "user_id", "group_id", "group_sequence",
+                     "reply_to_group_id", "instructions", "annotations",
+                     "properties", "body"):
+            value = getattr(self, attr)
+            if value: props.append("%s=%r" % (attr, value))
+        return "Message(%s)" % ", ".join(props)
+
+    def __repr__(self):
+        tmp = pn_string(None)
+        err = pn_inspect(self._msg, tmp)
+        result = pn_string_get(tmp)
+        pn_free(tmp)
+        self._check(err)
+        return result

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_reactor_impl.py
----------------------------------------------------------------------
diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py
new file mode 100644
index 0000000..39986ff
--- /dev/null
+++ b/python/proton/_reactor_impl.py
@@ -0,0 +1,217 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import weakref
+
+from cproton import PN_INVALID_SOCKET, \
+    pn_incref, pn_decref, \
+    pn_handler_add, pn_handler_clear, pn_pyhandler, \
+    pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \
+    pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \
+    pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \
+    pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \
+    pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd
+
+from ._common import millis2secs, secs2millis
+from ._wrapper import Wrapper
+
+from . import _compat
+
+_DEFAULT = object()
+
+
+class Selectable(Wrapper):
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            return Selectable(impl)
+
+    def __init__(self, impl):
+        Wrapper.__init__(self, impl, pn_selectable_attachments)
+
+    def _init(self):
+        pass
+
+    def fileno(self, fd=_DEFAULT):
+        if fd is _DEFAULT:
+            return pn_selectable_get_fd(self._impl)
+        elif fd is None:
+            pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
+        else:
+            pn_selectable_set_fd(self._impl, fd)
+
+    def _is_reading(self):
+        return pn_selectable_is_reading(self._impl)
+
+    def _set_reading(self, val):
+        pn_selectable_set_reading(self._impl, bool(val))
+
+    reading = property(_is_reading, _set_reading)
+
+    def _is_writing(self):
+        return pn_selectable_is_writing(self._impl)
+
+    def _set_writing(self, val):
+        pn_selectable_set_writing(self._impl, bool(val))
+
+    writing = property(_is_writing, _set_writing)
+
+    def _get_deadline(self):
+        tstamp = pn_selectable_get_deadline(self._impl)
+        if tstamp:
+            return millis2secs(tstamp)
+        else:
+            return None
+
+    def _set_deadline(self, deadline):
+        pn_selectable_set_deadline(self._impl, secs2millis(deadline))
+
+    deadline = property(_get_deadline, _set_deadline)
+
+    def readable(self):
+        pn_selectable_readable(self._impl)
+
+    def writable(self):
+        pn_selectable_writable(self._impl)
+
+    def expired(self):
+        pn_selectable_expired(self._impl)
+
+    def _is_registered(self):
+        return pn_selectable_is_registered(self._impl)
+
+    def _set_registered(self, registered):
+        pn_selectable_set_registered(self._impl, registered)
+
+    registered = property(_is_registered, _set_registered,
+                          doc="""
+The registered property may be get/set by an I/O polling system to
+indicate whether the fd has been registered or not.
+""")
+
+    @property
+    def is_terminal(self):
+        return pn_selectable_is_terminal(self._impl)
+
+    def terminate(self):
+        pn_selectable_terminate(self._impl)
+
+    def release(self):
+        pn_selectable_release(self._impl)
+
+
+class _cadapter:
+
+    def __init__(self, handler, on_error=None):
+        self.handler = handler
+        self.on_error = on_error
+
+    def dispatch(self, cevent, ctype):
+        from ._events import Event
+        ev = Event.wrap(cevent, ctype)
+        ev.dispatch(self.handler)
+
+    def exception(self, exc, val, tb):
+        if self.on_error is None:
+            _compat.raise_(exc, val, tb)
+        else:
+            self.on_error((exc, val, tb))
+
+
+class WrappedHandlersChildSurrogate:
+    def __init__(self, delegate):
+        self.handlers = []
+        self.delegate = weakref.ref(delegate)
+
+    def on_unhandled(self, method, event):
+        from ._events import dispatch
+        delegate = self.delegate()
+        if delegate:
+            dispatch(delegate, method, event)
+
+
+class WrappedHandlersProperty(object):
+    def __get__(self, obj, clazz):
+        if obj is None:
+            return None
+        return self.surrogate(obj).handlers
+
+    def __set__(self, obj, value):
+        self.surrogate(obj).handlers = value
+
+    def surrogate(self, obj):
+        key = "_surrogate"
+        objdict = obj.__dict__
+        surrogate = objdict.get(key, None)
+        if surrogate is None:
+            objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
+            obj.add(surrogate)
+        return surrogate
+
+
+class WrappedHandler(Wrapper):
+    handlers = WrappedHandlersProperty()
+
+    @classmethod
+    def wrap(cls, impl, on_error=None):
+        if impl is None:
+            return None
+        else:
+            handler = cls(impl)
+            handler.__dict__["on_error"] = on_error
+            return handler
+
+    def __init__(self, impl_or_constructor):
+        Wrapper.__init__(self, impl_or_constructor)
+        if list(self.__class__.__mro__).index(WrappedHandler) > 1:
+            # instantiate the surrogate
+            self.handlers.extend([])
+
+    def _on_error(self, info):
+        on_error = getattr(self, "on_error", None)
+        if on_error is None:
+            _compat.raise_(info[0], info[1], info[2])
+        else:
+            on_error(info)
+
+    def add(self, handler, on_error=None):
+        if handler is None: return
+        if on_error is None: on_error = self._on_error
+        impl = _chandler(handler, on_error)
+        pn_handler_add(self._impl, impl)
+        pn_decref(impl)
+
+    def clear(self):
+        pn_handler_clear(self._impl)
+
+
+def _chandler(obj, on_error=None):
+    if obj is None:
+        return None
+    elif isinstance(obj, WrappedHandler):
+        impl = obj._impl
+        pn_incref(impl)
+        return impl
+    else:
+        return pn_pyhandler(_cadapter(obj, on_error))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_transport.py
----------------------------------------------------------------------
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
new file mode 100644
index 0000000..3db0078
--- /dev/null
+++ b/python/proton/_transport.py
@@ -0,0 +1,524 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from cproton import PN_SASL_AUTH, PN_SASL_PERM, PN_SASL_SYS, PN_SSL_RESUME_REUSED, PN_SASL_NONE, PN_SSL_SHA1, \
+    PN_SSL_CERT_SUBJECT_COUNTRY_NAME, PN_SASL_OK, PN_SSL_RESUME_UNKNOWN, PN_EOS, PN_SSL_ANONYMOUS_PEER, PN_SSL_MD5, \
+    PN_SSL_CERT_SUBJECT_COMMON_NAME, PN_SSL_VERIFY_PEER, PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY, PN_SSL_MODE_SERVER, \
+    PN_TRACE_DRV, PN_TRACE_RAW, pn_transport, PN_SSL_SHA256, PN_TRACE_FRM, PN_SSL_MODE_CLIENT, PN_SASL_TEMP, \
+    PN_SSL_SHA512, PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT, PN_OK, PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE, \
+    PN_SSL_VERIFY_PEER_NAME, PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME, PN_SSL_RESUME_NEW, PN_TRACE_OFF, \
+    pn_transport_get_channel_max, pn_transport_capacity, pn_transport_push, pn_transport_get_user, pn_transport_tick, \
+    pn_transport_set_max_frame, pn_transport_attachments, pn_transport_unbind, pn_transport_peek, \
+    pn_transport_set_channel_max, pn_transport_close_tail, pn_transport_condition, pn_transport_is_encrypted, \
+    pn_transport_get_frames_input, pn_transport_bind, pn_transport_closed, pn_transport_get_idle_timeout, \
+    pn_transport_get_remote_idle_timeout, pn_transport_get_frames_output, pn_transport_pending, \
+    pn_transport_set_pytracer, pn_transport_close_head, pn_transport_get_remote_max_frame, \
+    pn_transport_is_authenticated, pn_transport_set_idle_timeout, pn_transport_log, pn_transport_get_pytracer, \
+    pn_transport_require_auth, pn_transport_get_max_frame, pn_transport_set_server, pn_transport_remote_channel_max, \
+    pn_transport_require_encryption, pn_transport_pop, pn_transport_connection, \
+    pn_sasl, pn_sasl_set_allow_insecure_mechs, pn_sasl_outcome, pn_transport_error, pn_sasl_get_user, \
+    pn_sasl_extended, pn_sasl_done, pn_sasl_get_allow_insecure_mechs, pn_sasl_allowed_mechs, \
+    pn_sasl_config_name, pn_sasl_config_path, \
+    pn_ssl, pn_ssl_init, pn_ssl_domain_allow_unsecured_client, pn_ssl_domain_free, \
+    pn_ssl_domain, pn_transport_trace, pn_ssl_resume_status, pn_sasl_get_mech, \
+    pn_ssl_domain_set_trusted_ca_db, pn_ssl_get_remote_subject_subfield, pn_ssl_present, \
+    pn_ssl_get_remote_subject, pn_ssl_domain_set_credentials, pn_ssl_domain_set_peer_authentication, \
+    pn_ssl_get_peer_hostname, pn_ssl_set_peer_hostname, pn_ssl_get_cipher_name, pn_ssl_get_cert_fingerprint, \
+    pn_ssl_get_protocol_name, \
+    pn_error_text
+
+from ._common import millis2secs, secs2millis, unicode2utf8, utf82unicode
+from ._condition import cond2obj
+from ._exceptions import EXCEPTIONS, TransportException, SessionException, SSLException, SSLUnavailable
+from ._wrapper import Wrapper
+
+
+class TraceAdapter:
+
+    def __init__(self, tracer):
+        self.tracer = tracer
+
+    def __call__(self, trans_impl, message):
+        self.tracer(Transport.wrap(trans_impl), message)
+
+
+class Transport(Wrapper):
+    TRACE_OFF = PN_TRACE_OFF
+    TRACE_DRV = PN_TRACE_DRV
+    TRACE_FRM = PN_TRACE_FRM
+    TRACE_RAW = PN_TRACE_RAW
+
+    CLIENT = 1
+    SERVER = 2
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            return Transport(_impl=impl)
+
+    def __init__(self, mode=None, _impl=pn_transport):
+        Wrapper.__init__(self, _impl, pn_transport_attachments)
+        if mode == Transport.SERVER:
+            pn_transport_set_server(self._impl)
+        elif mode is None or mode == Transport.CLIENT:
+            pass
+        else:
+            raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
+
+    def _init(self):
+        self._sasl = None
+        self._ssl = None
+
+    def _check(self, err):
+        if err < 0:
+            exc = EXCEPTIONS.get(err, TransportException)
+            raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
+        else:
+            return err
+
+    def _set_tracer(self, tracer):
+        pn_transport_set_pytracer(self._impl, TraceAdapter(tracer))
+
+    def _get_tracer(self):
+        adapter = pn_transport_get_pytracer(self._impl)
+        if adapter:
+            return adapter.tracer
+        else:
+            return None
+
+    tracer = property(_get_tracer, _set_tracer,
+                      doc="""
+A callback for trace logging. The callback is passed the transport and log message.
+""")
+
+    def log(self, message):
+        pn_transport_log(self._impl, message)
+
+    def require_auth(self, bool):
+        pn_transport_require_auth(self._impl, bool)
+
+    @property
+    def authenticated(self):
+        return pn_transport_is_authenticated(self._impl)
+
+    def require_encryption(self, bool):
+        pn_transport_require_encryption(self._impl, bool)
+
+    @property
+    def encrypted(self):
+        return pn_transport_is_encrypted(self._impl)
+
+    @property
+    def user(self):
+        return pn_transport_get_user(self._impl)
+
+    def bind(self, connection):
+        """Assign a connection to the transport"""
+        self._check(pn_transport_bind(self._impl, connection._impl))
+
+    def unbind(self):
+        """Release the connection"""
+        self._check(pn_transport_unbind(self._impl))
+
+    def trace(self, n):
+        pn_transport_trace(self._impl, n)
+
+    def tick(self, now):
+        """Process any timed events (like heartbeat generation).
+        now = seconds since epoch (float).
+        """
+        return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
+
+    def capacity(self):
+        c = pn_transport_capacity(self._impl)
+        if c >= PN_EOS:
+            return c
+        else:
+            return self._check(c)
+
+    def push(self, binary):
+        n = self._check(pn_transport_push(self._impl, binary))
+        if n != len(binary):
+            raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
+
+    def close_tail(self):
+        self._check(pn_transport_close_tail(self._impl))
+
+    def pending(self):
+        p = pn_transport_pending(self._impl)
+        if p >= PN_EOS:
+            return p
+        else:
+            return self._check(p)
+
+    def peek(self, size):
+        cd, out = pn_transport_peek(self._impl, size)
+        if cd == PN_EOS:
+            return None
+        else:
+            self._check(cd)
+            return out
+
+    def pop(self, size):
+        pn_transport_pop(self._impl, size)
+
+    def close_head(self):
+        self._check(pn_transport_close_head(self._impl))
+
+    @property
+    def closed(self):
+        return pn_transport_closed(self._impl)
+
+    # AMQP 1.0 max-frame-size
+    def _get_max_frame_size(self):
+        return pn_transport_get_max_frame(self._impl)
+
+    def _set_max_frame_size(self, value):
+        pn_transport_set_max_frame(self._impl, value)
+
+    max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
+                              doc="""
+Sets the maximum size for received frames (in bytes).
+""")
+
+    @property
+    def remote_max_frame_size(self):
+        return pn_transport_get_remote_max_frame(self._impl)
+
+    def _get_channel_max(self):
+        return pn_transport_get_channel_max(self._impl)
+
+    def _set_channel_max(self, value):
+        if pn_transport_set_channel_max(self._impl, value):
+            raise SessionException("Too late to change channel max.")
+
+    channel_max = property(_get_channel_max, _set_channel_max,
+                           doc="""
+Sets the maximum channel that may be used on the transport.
+""")
+
+    @property
+    def remote_channel_max(self):
+        return pn_transport_remote_channel_max(self._impl)
+
+    # AMQP 1.0 idle-time-out
+    def _get_idle_timeout(self):
+        return millis2secs(pn_transport_get_idle_timeout(self._impl))
+
+    def _set_idle_timeout(self, sec):
+        pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
+
+    idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
+                            doc="""
+The idle timeout of the connection (float, in seconds).
+""")
+
+    @property
+    def remote_idle_timeout(self):
+        return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
+
+    @property
+    def frames_output(self):
+        return pn_transport_get_frames_output(self._impl)
+
+    @property
+    def frames_input(self):
+        return pn_transport_get_frames_input(self._impl)
+
+    def sasl(self):
+        return SASL(self)
+
+    def ssl(self, domain=None, session_details=None):
+        # SSL factory (singleton for this transport)
+        if not self._ssl:
+            self._ssl = SSL(self, domain, session_details)
+        return self._ssl
+
+    @property
+    def condition(self):
+        return cond2obj(pn_transport_condition(self._impl))
+
+    @property
+    def connection(self):
+        from . import _endpoints
+        return _endpoints.Connection.wrap(pn_transport_connection(self._impl))
+
+
+class SASLException(TransportException):
+    pass
+
+
+class SASL(Wrapper):
+    OK = PN_SASL_OK
+    AUTH = PN_SASL_AUTH
+    SYS = PN_SASL_SYS
+    PERM = PN_SASL_PERM
+    TEMP = PN_SASL_TEMP
+
+    @staticmethod
+    def extended():
+        return pn_sasl_extended()
+
+    def __init__(self, transport):
+        Wrapper.__init__(self, transport._impl, pn_transport_attachments)
+        self._sasl = pn_sasl(transport._impl)
+
+    def _check(self, err):
+        if err < 0:
+            exc = EXCEPTIONS.get(err, SASLException)
+            raise exc("[%s]" % (err))
+        else:
+            return err
+
+    @property
+    def user(self):
+        return pn_sasl_get_user(self._sasl)
+
+    @property
+    def mech(self):
+        return pn_sasl_get_mech(self._sasl)
+
+    @property
+    def outcome(self):
+        outcome = pn_sasl_outcome(self._sasl)
+        if outcome == PN_SASL_NONE:
+            return None
+        else:
+            return outcome
+
+    def allowed_mechs(self, mechs):
+        pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
+
+    def _get_allow_insecure_mechs(self):
+        return pn_sasl_get_allow_insecure_mechs(self._sasl)
+
+    def _set_allow_insecure_mechs(self, insecure):
+        pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
+
+    allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
+                                    doc="""
+Allow unencrypted cleartext passwords (PLAIN mech)
+""")
+
+    def done(self, outcome):
+        pn_sasl_done(self._sasl, outcome)
+
+    def config_name(self, name):
+        pn_sasl_config_name(self._sasl, name)
+
+    def config_path(self, path):
+        pn_sasl_config_path(self._sasl, path)
+
+
+class SSLDomain(object):
+    MODE_CLIENT = PN_SSL_MODE_CLIENT
+    MODE_SERVER = PN_SSL_MODE_SERVER
+    VERIFY_PEER = PN_SSL_VERIFY_PEER
+    VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
+    ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
+
+    def __init__(self, mode):
+        self._domain = pn_ssl_domain(mode)
+        if self._domain is None:
+            raise SSLUnavailable()
+
+    def _check(self, err):
+        if err < 0:
+            exc = EXCEPTIONS.get(err, SSLException)
+            raise exc("SSL failure.")
+        else:
+            return err
+
+    def set_credentials(self, cert_file, key_file, password):
+        return self._check(pn_ssl_domain_set_credentials(self._domain,
+                                                         cert_file, key_file,
+                                                         password))
+
+    def set_trusted_ca_db(self, certificate_db):
+        return self._check(pn_ssl_domain_set_trusted_ca_db(self._domain,
+                                                           certificate_db))
+
+    def set_peer_authentication(self, verify_mode, trusted_CAs=None):
+        return self._check(pn_ssl_domain_set_peer_authentication(self._domain,
+                                                                 verify_mode,
+                                                                 trusted_CAs))
+
+    def allow_unsecured_client(self):
+        return self._check(pn_ssl_domain_allow_unsecured_client(self._domain))
+
+    def __del__(self):
+        pn_ssl_domain_free(self._domain)
+
+
+class SSL(object):
+
+    @staticmethod
+    def present():
+        return pn_ssl_present()
+
+    def _check(self, err):
+        if err < 0:
+            exc = EXCEPTIONS.get(err, SSLException)
+            raise exc("SSL failure.")
+        else:
+            return err
+
+    def __new__(cls, transport, domain, session_details=None):
+        """Enforce a singleton SSL object per Transport"""
+        if transport._ssl:
+            # unfortunately, we've combined the allocation and the configuration in a
+            # single step.  So catch any attempt by the application to provide what
+            # may be a different configuration than the original (hack)
+            ssl = transport._ssl
+            if (domain and (ssl._domain is not domain) or
+                    session_details and (ssl._session_details is not session_details)):
+                raise SSLException("Cannot re-configure existing SSL object!")
+        else:
+            obj = super(SSL, cls).__new__(cls)
+            obj._domain = domain
+            obj._session_details = session_details
+            session_id = None
+            if session_details:
+                session_id = session_details.get_session_id()
+            obj._ssl = pn_ssl(transport._impl)
+            if obj._ssl is None:
+                raise SSLUnavailable()
+            if domain:
+                pn_ssl_init(obj._ssl, domain._domain, session_id)
+            transport._ssl = obj
+        return transport._ssl
+
+    def cipher_name(self):
+        rc, name = pn_ssl_get_cipher_name(self._ssl, 128)
+        if rc:
+            return name
+        return None
+
+    def protocol_name(self):
+        rc, name = pn_ssl_get_protocol_name(self._ssl, 128)
+        if rc:
+            return name
+        return None
+
+    SHA1 = PN_SSL_SHA1
+    SHA256 = PN_SSL_SHA256
+    SHA512 = PN_SSL_SHA512
+    MD5 = PN_SSL_MD5
+
+    CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
+    CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
+    CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
+    CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
+    CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
+    CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
+
+    def get_cert_subject_subfield(self, subfield_name):
+        subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
+        return subfield_value
+
+    def get_cert_subject(self):
+        subject = pn_ssl_get_remote_subject(self._ssl)
+        return subject
+
+    def _get_cert_subject_unknown_subfield(self):
+        # Pass in an unhandled enum
+        return self.get_cert_subject_subfield(10)
+
+    # Convenience functions for obtaining the subfields of the subject field.
+    def get_cert_common_name(self):
+        return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME)
+
+    def get_cert_organization(self):
+        return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME)
+
+    def get_cert_organization_unit(self):
+        return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT)
+
+    def get_cert_locality_or_city(self):
+        return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY)
+
+    def get_cert_country(self):
+        return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME)
+
+    def get_cert_state_or_province(self):
+        return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE)
+
+    def get_cert_fingerprint(self, fingerprint_length, digest_name):
+        rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
+        if rc == PN_OK:
+            return fingerprint_str
+        return None
+
+    # Convenience functions for obtaining fingerprint for specific hashing algorithms
+    def _get_cert_fingerprint_unknown_hash_alg(self):
+        return self.get_cert_fingerprint(41, 10)
+
+    def get_cert_fingerprint_sha1(self):
+        return self.get_cert_fingerprint(41, SSL.SHA1)
+
+    def get_cert_fingerprint_sha256(self):
+        # sha256 produces a fingerprint that is 64 characters long
+        return self.get_cert_fingerprint(65, SSL.SHA256)
+
+    def get_cert_fingerprint_sha512(self):
+        # sha512 produces a fingerprint that is 128 characters long
+        return self.get_cert_fingerprint(129, SSL.SHA512)
+
+    def get_cert_fingerprint_md5(self):
+        return self.get_cert_fingerprint(33, SSL.MD5)
+
+    @property
+    def remote_subject(self):
+        return pn_ssl_get_remote_subject(self._ssl)
+
+    RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
+    RESUME_NEW = PN_SSL_RESUME_NEW
+    RESUME_REUSED = PN_SSL_RESUME_REUSED
+
+    def resume_status(self):
+        return pn_ssl_resume_status(self._ssl)
+
+    def _set_peer_hostname(self, hostname):
+        self._check(pn_ssl_set_peer_hostname(self._ssl, unicode2utf8(hostname)))
+
+    def _get_peer_hostname(self):
+        err, name = pn_ssl_get_peer_hostname(self._ssl, 1024)
+        self._check(err)
+        return utf82unicode(name)
+
+    peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
+                             doc="""
+Manage the expected name of the remote peer.  Used to authenticate the remote.
+""")
+
+
+class SSLSessionDetails(object):
+    """ Unique identifier for the SSL session.  Used to resume previous session on a new
+    SSL connection.
+    """
+
+    def __init__(self, session_id):
+        self._session_id = session_id
+
+    def get_session_id(self):
+        return self._session_id

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_url.py
----------------------------------------------------------------------
diff --git a/python/proton/_url.py b/python/proton/_url.py
new file mode 100644
index 0000000..b4a9a6a
--- /dev/null
+++ b/python/proton/_url.py
@@ -0,0 +1,161 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import socket
+
+from cproton import pn_url, pn_url_free, pn_url_parse, pn_url_str, pn_url_get_port, pn_url_get_scheme, \
+    pn_url_get_host, pn_url_get_username, pn_url_get_password, pn_url_get_path, pn_url_set_scheme, pn_url_set_host, \
+    pn_url_set_username, pn_url_set_password, pn_url_set_port, pn_url_set_path
+
+from ._common import unicode2utf8
+
+
+class Url(object):
+    """
+    Simple URL parser/constructor, handles URLs of the form:
+
+    <scheme>://<user>:<password>@<host>:<port>/<path>
+
+    All components can be None if not specified in the URL string.
+
+    The port can be specified as a service name, e.g. 'amqp' in the
+    URL string but Url.port always gives the integer value.
+
+    Warning: The placement of user and password in URLs is not
+    recommended.  It can result in credentials leaking out in program
+    logs.  Use connection configuration attributes instead.
+
+    @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
+    @ivar user: Username
+    @ivar password: Password
+    @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
+    @ivar port: Integer port.
+    @ivar host_port: Returns host:port
+    """
+
+    AMQPS = "amqps"
+    AMQP = "amqp"
+
+    class Port(int):
+        """An integer port number that can be constructed from a service name string"""
+
+        def __new__(cls, value):
+            """@param value: integer port number or string service name."""
+            port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
+            setattr(port, 'name', str(value))
+            return port
+
+        def __eq__(self, x):
+            return str(self) == x or int(self) == x
+
+        def __ne__(self, x):
+            return not self == x
+
+        def __str__(self):
+            return str(self.name)
+
+        @staticmethod
+        def _port_int(value):
+            """Convert service, an integer or a service name, into an integer port number."""
+            try:
+                return int(value)
+            except ValueError:
+                try:
+                    return socket.getservbyname(value)
+                except socket.error:
+                    # Not every system has amqp/amqps defined as a service
+                    if value == Url.AMQPS:
+                        return 5671
+                    elif value == Url.AMQP:
+                        return 5672
+                    else:
+                        raise ValueError("Not a valid port number or service name: '%s'" % value)
+
+    def __init__(self, url=None, defaults=True, **kwargs):
+        """
+        @param url: URL string to parse.
+        @param defaults: If true, fill in missing default values in the URL.
+            If false, you can fill them in later by calling self.defaults()
+        @param kwargs: scheme, user, password, host, port, path.
+          If specified, replaces corresponding part in url string.
+        """
+        if url:
+            self._url = pn_url_parse(unicode2utf8(str(url)))
+            if not self._url: raise ValueError("Invalid URL '%s'" % url)
+        else:
+            self._url = pn_url()
+        for k in kwargs:  # Let kwargs override values parsed from url
+            getattr(self, k)  # Check for invalid kwargs
+            setattr(self, k, kwargs[k])
+        if defaults: self.defaults()
+
+    class PartDescriptor(object):
+        def __init__(self, part):
+            self.getter = globals()["pn_url_get_%s" % part]
+            self.setter = globals()["pn_url_set_%s" % part]
+
+        def __get__(self, obj, type=None): return self.getter(obj._url)
+
+        def __set__(self, obj, value): return self.setter(obj._url, str(value))
+
+    scheme = PartDescriptor('scheme')
+    username = PartDescriptor('username')
+    password = PartDescriptor('password')
+    host = PartDescriptor('host')
+    path = PartDescriptor('path')
+
+    def _get_port(self):
+        portstr = pn_url_get_port(self._url)
+        return portstr and Url.Port(portstr)
+
+    def _set_port(self, value):
+        if value is None:
+            pn_url_set_port(self._url, None)
+        else:
+            pn_url_set_port(self._url, str(Url.Port(value)))
+
+    port = property(_get_port, _set_port)
+
+    def __str__(self):
+        return pn_url_str(self._url)
+
+    def __repr__(self):
+        return "Url(%s://%s/%s)" % (self.scheme, self.host, self.path)
+
+    def __eq__(self, x):
+        return str(self) == str(x)
+
+    def __ne__(self, x):
+        return not self == x
+
+    def __del__(self):
+        pn_url_free(self._url)
+        del self._url
+
+    def defaults(self):
+        """
+        Fill in missing values (scheme, host or port) with defaults
+        @return: self
+        """
+        self.scheme = self.scheme or self.AMQP
+        self.host = self.host or '0.0.0.0'
+        self.port = self.port or self.Port(self.scheme)
+        return self

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_wrapper.py
----------------------------------------------------------------------
diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py
new file mode 100644
index 0000000..805ecb1
--- /dev/null
+++ b/python/proton/_wrapper.py
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from cproton import pn_incref, pn_decref, \
+    pn_py2void, pn_void2py, \
+    pn_record_get, pn_record_def, pn_record_set, \
+    PN_PYREF
+
+
+class EmptyAttrs:
+
+    def __contains__(self, name):
+        return False
+
+    def __getitem__(self, name):
+        raise KeyError(name)
+
+    def __setitem__(self, name, value):
+        raise TypeError("does not support item assignment")
+
+
+EMPTY_ATTRS = EmptyAttrs()
+
+
+class Wrapper(object):
+
+    def __init__(self, impl_or_constructor, get_context=None):
+        init = False
+        if callable(impl_or_constructor):
+            # we are constructing a new object
+            impl = impl_or_constructor()
+            if impl is None:
+                self.__dict__["_impl"] = impl
+                self.__dict__["_attrs"] = EMPTY_ATTRS
+                self.__dict__["_record"] = None
+                from proton import ProtonException
+                raise ProtonException(
+                    "Wrapper failed to create wrapped object. Check for file descriptor or memory exhaustion.")
+            init = True
+        else:
+            # we are wrapping an existing object
+            impl = impl_or_constructor
+            pn_incref(impl)
+
+        if get_context:
+            record = get_context(impl)
+            attrs = pn_void2py(pn_record_get(record, PYCTX))
+            if attrs is None:
+                attrs = {}
+                pn_record_def(record, PYCTX, PN_PYREF)
+                pn_record_set(record, PYCTX, pn_py2void(attrs))
+                init = True
+        else:
+            attrs = EMPTY_ATTRS
+            init = False
+            record = None
+        self.__dict__["_impl"] = impl
+        self.__dict__["_attrs"] = attrs
+        self.__dict__["_record"] = record
+        if init: self._init()
+
+    def __getattr__(self, name):
+        attrs = self.__dict__["_attrs"]
+        if name in attrs:
+            return attrs[name]
+        else:
+            raise AttributeError(name + " not in _attrs")
+
+    def __setattr__(self, name, value):
+        if hasattr(self.__class__, name):
+            object.__setattr__(self, name, value)
+        else:
+            attrs = self.__dict__["_attrs"]
+            attrs[name] = value
+
+    def __delattr__(self, name):
+        attrs = self.__dict__["_attrs"]
+        if attrs:
+            del attrs[name]
+
+    def __hash__(self):
+        return hash(addressof(self._impl))
+
+    def __eq__(self, other):
+        if isinstance(other, Wrapper):
+            return addressof(self._impl) == addressof(other._impl)
+        return False
+
+    def __ne__(self, other):
+        if isinstance(other, Wrapper):
+            return addressof(self._impl) != addressof(other._impl)
+        return True
+
+    def __del__(self):
+        pn_decref(self._impl)
+
+    def __repr__(self):
+        return '<%s.%s 0x%x ~ 0x%x>' % (self.__class__.__module__,
+                                        self.__class__.__name__,
+                                        id(self), addressof(self._impl))
+
+
+PYCTX = int(pn_py2void(Wrapper))
+addressof = int

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/python/proton/handlers.py b/python/proton/handlers.py
index 76c9e51..1e61f44 100644
--- a/python/proton/handlers.py
+++ b/python/proton/handlers.py
@@ -16,28 +16,35 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import heapq, logging, os, re, socket, time, types, weakref
 
-from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
-from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
-from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+from __future__ import absolute_import
+
+import logging
+import time
+import weakref
 from select import select
 
+from proton import Delivery, Endpoint
+from proton import Message, Handler, ProtonException
+from ._events import dispatch
+
 log = logging.getLogger("proton")
 
+
 class OutgoingMessageHandler(Handler):
     """
     A utility for simpler and more intuitive handling of delivery
     events related to outgoing i.e. sent messages.
     """
+
     def __init__(self, auto_settle=True, delegate=None):
         self.auto_settle = auto_settle
         self.delegate = delegate
 
     def on_link_flow(self, event):
         if event.link.is_sender and event.link.credit \
-           and event.link.state & Endpoint.LOCAL_ACTIVE \
-           and event.link.state & Endpoint.REMOTE_ACTIVE :
+                and event.link.state & Endpoint.LOCAL_ACTIVE \
+                and event.link.state & Endpoint.REMOTE_ACTIVE:
             self.on_sendable(event)
 
     def on_delivery(self, event):
@@ -94,23 +101,27 @@ class OutgoingMessageHandler(Handler):
         if self.delegate != None:
             dispatch(self.delegate, 'on_settled', event)
 
+
 def recv_msg(delivery):
     msg = Message()
     msg.decode(delivery.link.recv(delivery.pending))
     delivery.link.advance()
     return msg
 
+
 class Reject(ProtonException):
-  """
-  An exception that indicate a message should be rejected
-  """
-  pass
+    """
+    An exception that indicate a message should be rejected
+    """
+    pass
+
 
 class Release(ProtonException):
-  """
-  An exception that indicate a message should be rejected
-  """
-  pass
+    """
+    An exception that indicate a message should be rejected
+    """
+    pass
+
 
 class Acking(object):
     def accept(self, delivery):
@@ -146,6 +157,7 @@ class Acking(object):
             delivery.update(state)
         delivery.settle()
 
+
 class IncomingMessageHandler(Handler, Acking):
     """
     A utility for simpler and more intuitive handling of delivery
@@ -202,6 +214,7 @@ class IncomingMessageHandler(Handler, Acking):
         if self.delegate != None:
             dispatch(self.delegate, 'on_aborted', event)
 
+
 class EndpointStateHandler(Handler):
     """
     A utility that exposes 'endpoint' events i.e. the open/close for
@@ -272,7 +285,7 @@ class EndpointStateHandler(Handler):
                 return
             self.on_connection_error(event)
         elif self.is_local_closed(event.connection):
-           self.on_connection_closed(event)
+            self.on_connection_closed(event)
         else:
             self.on_connection_closing(event)
         event.connection.close()
@@ -391,12 +404,14 @@ class EndpointStateHandler(Handler):
         if self.delegate != None and event.connection and self.is_local_open(event.connection):
             dispatch(self.delegate, 'on_disconnected', event)
 
+
 class MessagingHandler(Handler, Acking):
     """
     A general purpose handler that makes the proton-c events somewhat
     simpler to deal with and/or avoids repetitive tasks for common use
     cases.
     """
+
     def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
         self.handlers = []
         if prefetch:
@@ -414,7 +429,8 @@ class MessagingHandler(Handler, Acking):
         """
         if event.transport.condition:
             if event.transport.condition.info:
-                log.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
+                log.error("%s: %s: %s" % (
+                event.transport.condition.name, event.transport.condition.description, event.transport.condition.info))
             else:
                 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
             if event.transport.condition.name in self.fatal_conditions:
@@ -455,36 +471,43 @@ class MessagingHandler(Handler, Acking):
         Called when the event loop starts. (Just an alias for on_reactor_init)
         """
         pass
+
     def on_connection_closed(self, event):
         """
         Called when the connection is closed.
         """
         pass
+
     def on_session_closed(self, event):
         """
         Called when the session is closed.
         """
         pass
+
     def on_link_closed(self, event):
         """
         Called when the link is closed.
         """
         pass
+
     def on_connection_closing(self, event):
         """
         Called when the peer initiates the closing of the connection.
         """
         pass
+
     def on_session_closing(self, event):
         """
         Called when the peer initiates the closing of the session.
         """
         pass
+
     def on_link_closing(self, event):
         """
         Called when the peer initiates the closing of the link.
         """
         pass
+
     def on_disconnected(self, event):
         """
         Called when the socket is disconnected.
@@ -525,6 +548,7 @@ class MessagingHandler(Handler, Acking):
         retransmitted.
         """
         pass
+
     def on_message(self, event):
         """
         Called when a message is received. The message itself can be
@@ -535,11 +559,13 @@ class MessagingHandler(Handler, Acking):
         """
         pass
 
+
 class TransactionHandler(object):
     """
     The interface for transaction handlers, i.e. objects that want to
     be notified of state changes related to a transaction.
     """
+
     def on_transaction_declared(self, event):
         pass
 
@@ -555,6 +581,7 @@ class TransactionHandler(object):
     def on_transaction_commit_failed(self, event):
         pass
 
+
 class TransactionalClientHandler(MessagingHandler, TransactionHandler):
     """
     An extension to the MessagingHandler for applications using
@@ -570,24 +597,29 @@ class TransactionalClientHandler(MessagingHandler, TransactionHandler):
         else:
             super(TransactionalClientHandler, self).accept(delivery)
 
-from proton import WrappedHandler
+
+from ._events import WrappedHandler
 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
 
+
 class CFlowController(WrappedHandler):
 
     def __init__(self, window=1024):
         WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
 
+
 class CHandshaker(WrappedHandler):
 
     def __init__(self):
         WrappedHandler.__init__(self, pn_handshaker)
 
+
 class IOHandler(WrappedHandler):
 
     def __init__(self):
         WrappedHandler.__init__(self, pn_iohandler)
 
+
 class PythonIO:
 
     def __init__(self):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/python/proton/reactor.py b/python/proton/reactor.py
index d5d5183..ccdbf94 100644
--- a/python/proton/reactor.py
+++ b/python/proton/reactor.py
@@ -17,22 +17,35 @@ from __future__ import absolute_import
 # specific language governing permissions and limitations
 # under the License.
 #
-import logging, os, socket, time, types
-from heapq import heappush, heappop, nsmallest
+from __future__ import absolute_import
 
+import os
+import logging
 import traceback
 
-from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
-from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
-from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
-from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
-from select import select
+from proton import Connection, Delivery, Described
+from proton import Endpoint, EventType, Handler, Link, Message
+from proton import Session, SSL, SSLDomain, SSLUnavailable, symbol
+from proton import Terminus, Transport, ulong, Url
 from proton.handlers import OutgoingMessageHandler
-from proton import unicode2utf8, utf82unicode
 
-from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
-from .wrapper import Wrapper, PYCTX
-from cproton import *
+from proton import generate_uuid
+
+from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
+
+from ._events import EventBase
+from ._reactor_impl import Selectable, WrappedHandler, _chandler
+from ._wrapper import Wrapper, PYCTX
+
+from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
+    pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
+    pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
+    pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
+    pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
+    pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
+    pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
+    pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
+    pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
 
 from . import _compat
 
@@ -40,6 +53,17 @@ from ._compat import queue
 
 log = logging.getLogger("proton")
 
+
+def _timeout2millis(secs):
+    if secs is None: return PN_MILLIS_MAX
+    return secs2millis(secs)
+
+
+def _millis2timeout(millis):
+    if millis == PN_MILLIS_MAX: return None
+    return millis2secs(millis)
+
+
 class Task(Wrapper):
 
     @staticmethod
@@ -58,6 +82,7 @@ class Task(Wrapper):
     def cancel(self):
         pn_task_cancel(self._impl)
 
+
 class Acceptor(Wrapper):
 
     def __init__(self, impl):
@@ -69,6 +94,7 @@ class Acceptor(Wrapper):
     def close(self):
         pn_acceptor_close(self._impl)
 
+
 class Reactor(Wrapper):
 
     @staticmethod
@@ -95,11 +121,12 @@ class Reactor(Wrapper):
     # error will always be generated from a callback from this reactor.
     # Needed to prevent reference cycles and be compatible with wrappers.
     class ErrorDelegate(object):
-      def __init__(self, reactor):
-        self.reactor_impl = reactor._impl
-      def on_error(self, info):
-        ractor = Reactor.wrap(self.reactor_impl)
-        ractor.on_error(info)
+        def __init__(self, reactor):
+            self.reactor_impl = reactor._impl
+
+        def on_error(self, info):
+            ractor = Reactor.wrap(self.reactor_impl)
+            ractor.on_error(info)
 
     def on_error_delegate(self):
         return Reactor.ErrorDelegate(self).on_error
@@ -119,10 +146,10 @@ class Reactor(Wrapper):
     global_handler = property(_get_global, _set_global)
 
     def _get_timeout(self):
-        return millis2timeout(pn_reactor_get_timeout(self._impl))
+        return _millis2timeout(pn_reactor_get_timeout(self._impl))
 
     def _set_timeout(self, secs):
-        return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
+        return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
 
     timeout = property(_get_timeout, _set_timeout)
 
@@ -244,7 +271,9 @@ class Reactor(Wrapper):
     def push_event(self, obj, etype):
         pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
 
-from proton import wrappers as _wrappers
+
+from ._events import wrappers as _wrappers
+
 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
 
@@ -258,6 +287,7 @@ class EventInjector(object):
     it. The close() method should be called when it is no longer
     needed, to allow the event loop to end if needed.
     """
+
     def __init__(self):
         self.queue = queue.Queue()
         self.pipe = os.pipe()
@@ -305,6 +335,7 @@ class ApplicationEvent(EventBase):
     Application defined event, which can optionally be associated with
     an engine object and or an arbitrary subject
     """
+
     def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
         super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
         self.connection = connection
@@ -323,10 +354,12 @@ class ApplicationEvent(EventBase):
         objects = [self.connection, self.session, self.link, self.delivery, self.subject]
         return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
 
+
 class Transaction(object):
     """
     Class to track state of an AMQP 1.0 transaction.
     """
+
     def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
         self.txn_ctrl = txn_ctrl
         self.handler = handler
@@ -397,7 +430,7 @@ class Transaction(object):
             if event.delivery.remote_state == Delivery.REJECTED:
                 if not self.failed:
                     self.handler.on_transaction_commit_failed(event)
-                    self._release_pending() # make this optional?
+                    self._release_pending()  # make this optional?
             else:
                 if self.failed:
                     self.handler.on_transaction_aborted(event)
@@ -406,16 +439,19 @@ class Transaction(object):
                     self.handler.on_transaction_committed(event)
             self._clear_pending()
 
+
 class LinkOption(object):
     """
     Abstract interface for link configuration options
     """
+
     def apply(self, link):
         """
         Subclasses will implement any configuration logic in this
         method
         """
         pass
+
     def test(self, link):
         """
         Subclasses can override this to selectively apply an option
@@ -423,23 +459,30 @@ class LinkOption(object):
         """
         return True
 
+
 class AtMostOnce(LinkOption):
     def apply(self, link):
         link.snd_settle_mode = Link.SND_SETTLED
 
+
 class AtLeastOnce(LinkOption):
     def apply(self, link):
         link.snd_settle_mode = Link.SND_UNSETTLED
         link.rcv_settle_mode = Link.RCV_FIRST
 
+
 class SenderOption(LinkOption):
     def apply(self, sender): pass
+
     def test(self, link): return link.is_sender
 
+
 class ReceiverOption(LinkOption):
     def apply(self, receiver): pass
+
     def test(self, link): return link.is_receiver
 
+
 class DynamicNodeProperties(LinkOption):
     def __init__(self, props={}):
         self.properties = {}
@@ -455,6 +498,7 @@ class DynamicNodeProperties(LinkOption):
         else:
             link.target.properties.put_dict(self.properties)
 
+
 class Filter(ReceiverOption):
     def __init__(self, filter_set={}):
         self.filter_set = filter_set
@@ -462,26 +506,32 @@ class Filter(ReceiverOption):
     def apply(self, receiver):
         receiver.source.filter.put_dict(self.filter_set)
 
+
 class Selector(Filter):
     """
     Configures a link with a message selector filter
     """
+
     def __init__(self, value, name='selector'):
         super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
 
+
 class DurableSubscription(ReceiverOption):
     def apply(self, receiver):
         receiver.source.durability = Terminus.DELIVERIES
         receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
 
+
 class Move(ReceiverOption):
     def apply(self, receiver):
         receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
 
+
 class Copy(ReceiverOption):
     def apply(self, receiver):
         receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
 
+
 def _apply_link_options(options, link):
     if options:
         if isinstance(options, list):
@@ -490,6 +540,7 @@ def _apply_link_options(options, link):
         else:
             if options.test(link): options.apply(link)
 
+
 def _create_session(connection, handler=None):
     session = connection.session()
     session.open()
@@ -502,6 +553,7 @@ def _get_attr(target, name):
     else:
         return None
 
+
 class SessionPerConnection(object):
     def __init__(self):
         self._default_session = None
@@ -511,11 +563,13 @@ class SessionPerConnection(object):
             self._default_session = _create_session(connection)
         return self._default_session
 
+
 class GlobalOverrides(object):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
+
     def __init__(self, base):
         self.base = base
 
@@ -527,11 +581,13 @@ class GlobalOverrides(object):
         conn = event.connection
         return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
 
+
 class Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
+
     def __init__(self, connection):
         self.connection = connection
         self.address = None
@@ -548,7 +604,7 @@ class Connector(Handler):
         self.max_frame_size = None
 
     def _connect(self, connection, reactor):
-        assert(reactor is not None)
+        assert (reactor is not None)
         url = self.address.next()
         reactor.set_connection_host(connection, url.host, str(url.port))
         # if virtual-host not set, use host from address as default
@@ -615,11 +671,13 @@ class Connector(Handler):
     def on_timer_task(self, event):
         self._connect(self.connection, event.reactor)
 
+
 class Backoff(object):
     """
     A reconnect strategy involving an increasing delay between
     retries, up to a maximum or 10 seconds.
     """
+
     def __init__(self):
         self.delay = 0
 
@@ -631,9 +689,10 @@ class Backoff(object):
         if current == 0:
             self.delay = 0.1
         else:
-            self.delay = min(10, 2*current)
+            self.delay = min(10, 2 * current)
         return current
 
+
 class Urls(object):
     def __init__(self, values):
         self.values = [Url(v) for v in values]
@@ -649,6 +708,7 @@ class Urls(object):
             self.i = iter(self.values)
             return next(self.i)
 
+
 class SSLConfig(object):
     def __init__(self):
         self.client = SSLDomain(SSLDomain.MODE_CLIENT)
@@ -670,6 +730,7 @@ class Container(Reactor):
        an extension to the Reactor class that adds convenience methods
        for creating connections and sender- or receiver- links.
     """
+
     def __init__(self, *handlers, **kwargs):
         super(Container, self).__init__(*handlers, **kwargs)
         if "impl" not in kwargs:
@@ -687,7 +748,8 @@ class Container(Reactor):
             self.password = None
             Wrapper.__setattr__(self, 'subclass', self.__class__)
 
-    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
+                **kwargs):
         """
         Initiates the establishment of an AMQP connection. Returns an
         instance of proton.Connection.
@@ -748,10 +810,14 @@ class Container(Reactor):
         connector.max_frame_size = kwargs.get('max_frame_size')
 
         conn._overrides = connector
-        if url: connector.address = Urls([url])
-        elif urls: connector.address = Urls(urls)
-        elif address: connector.address = address
-        else: raise ValueError("One of url, urls or address required")
+        if url:
+            connector.address = Urls([url])
+        elif urls:
+            connector.address = Urls(urls)
+        elif address:
+            connector.address = address
+        else:
+            raise ValueError("One of url, urls or address required")
         if heartbeat:
             connector.heartbeat = heartbeat
         if reconnect:
@@ -761,15 +827,19 @@ class Container(Reactor):
         # use container's default client domain if none specified.  This is
         # only necessary of the URL specifies the "amqps:" scheme
         connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
-        conn._session_policy = SessionPerConnection() #todo: make configurable
+        conn._session_policy = SessionPerConnection()  # todo: make configurable
         conn.open()
         return conn
 
     def _get_id(self, container, remote, local):
-        if local and remote: "%s-%s-%s" % (container, remote, local)
-        elif local: return "%s-%s" % (container, local)
-        elif remote: return "%s-%s" % (container, remote)
-        else: return "%s-%s" % (container, str(generate_uuid()))
+        if local and remote:
+            "%s-%s-%s" % (container, remote, local)
+        elif local:
+            return "%s-%s" % (container, local)
+        elif remote:
+            return "%s-%s" % (container, remote)
+        else:
+            return "%s-%s" % (container, str(generate_uuid()))
 
     def _get_session(self, context):
         if isinstance(context, Url):
@@ -806,7 +876,7 @@ class Container(Reactor):
         Various LinkOptions can be specified to further control the
         attachment.
         """
-        if isinstance(context, _compat.string_types):
+        if isstring(context):
             context = Url(context)
         if isinstance(context, Url) and not target:
             target = context.path
@@ -847,7 +917,7 @@ class Container(Reactor):
         Various LinkOptions can be specified to further control the
         attachment.
         """
-        if isinstance(context, _compat.string_types):
+        if isstring(context):
             context = Url(context)
         if isinstance(context, Url) and not source:
             source = context.path

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/python/proton/utils.py b/python/proton/utils.py
index 1d052d0..c6f8cb4 100644
--- a/python/proton/utils.py
+++ b/python/proton/utils.py
@@ -38,7 +38,8 @@ class BlockingLink(object):
             self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
                                  timeout=timeout,
                                  msg="Opening link %s" % self.link.name)
-        except Timeout as e: pass
+        except Timeout as e:
+            pass
         self._checkClosed()
 
     def _checkClosed(self):
@@ -53,31 +54,37 @@ class BlockingLink(object):
                              msg="Closing link %s" % self.link.name)
 
     # Access to other link attributes.
-    def __getattr__(self, name): return getattr(self.link, name)
+    def __getattr__(self, name):
+        return getattr(self.link, name)
+
 
 class SendException(ProtonException):
     """
     Exception used to indicate an exceptional state/condition on a send request
     """
+
     def __init__(self, state):
         self.state = state
 
+
 def _is_settled(delivery):
     return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
 
+
 class BlockingSender(BlockingLink):
     def __init__(self, connection, sender):
         super(BlockingSender, self).__init__(connection, sender)
         if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
-            #this may be followed by a detach, which may contain an error condition, so wait a little...
+            # this may be followed by a detach, which may contain an error condition, so wait a little...
             self._waitForClose()
-            #...but close ourselves if peer does not
+            # ...but close ourselves if peer does not
             self.link.close()
             raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
 
     def send(self, msg, timeout=False, error_states=None):
         delivery = self.link.send(msg)
-        self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout)
+        self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name,
+                             timeout=timeout)
         if delivery.link.snd_settle_mode != Link.SND_SETTLED:
             delivery.settle()
         bad = error_states
@@ -87,6 +94,7 @@ class BlockingSender(BlockingLink):
             raise SendException(delivery.remote_state)
         return delivery
 
+
 class Fetcher(MessagingHandler):
     def __init__(self, connection, prefetch):
         super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
@@ -96,7 +104,7 @@ class Fetcher(MessagingHandler):
 
     def on_message(self, event):
         self.incoming.append((event.message, event.delivery))
-        self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.
 
     def on_link_error(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
@@ -129,9 +137,9 @@ class BlockingReceiver(BlockingLink):
     def __init__(self, connection, receiver, fetcher, credit=1):
         super(BlockingReceiver, self).__init__(connection, receiver)
         if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
-            #this may be followed by a detach, which may contain an error condition, so wait a little...
+            # this may be followed by a detach, which may contain an error condition, so wait a little...
             self._waitForClose()
-            #...but close ourselves if peer does not
+            # ...but close ourselves if peer does not
             self.link.close()
             raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
         if credit: receiver.flow(credit)
@@ -151,7 +159,8 @@ class BlockingReceiver(BlockingLink):
             raise Exception("Can't call receive on this receiver as a handler was provided")
         if not self.link.credit:
             self.link.flow(1)
-        self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout)
+        self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name,
+                             timeout=timeout)
         return self.fetcher.pop()
 
     def accept(self):
@@ -210,6 +219,7 @@ class BlockingConnection(Handler):
     object operations are enclosed in a try block and that close() is
     always executed on exit.
     """
+
     def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
         self.disconnected = False
         self.timeout = timeout or 60
@@ -221,7 +231,8 @@ class BlockingConnection(Handler):
         self.closing = False
         failed = True
         try:
-            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
+            self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False,
+                                               heartbeat=heartbeat, **kwargs)
             self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                       msg="Opening connection")
             failed = False
@@ -230,7 +241,8 @@ class BlockingConnection(Handler):
                 self.close()
 
     def create_sender(self, address, handler=None, name=None, options=None):
-        return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
+        return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler,
+                                                                 options=options))
 
     def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
         prefetch = credit
@@ -241,7 +253,9 @@ class BlockingConnection(Handler):
         else:
             fetcher = Fetcher(self, credit)
         return BlockingReceiver(
-            self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
+            self,
+            self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher,
+                                           options=options), fetcher, credit=prefetch)
 
     def close(self):
         # TODO: provide stronger interrupt protection on cleanup.  See PEP 419
@@ -259,8 +273,8 @@ class BlockingConnection(Handler):
             # Nothing left to block on.  Allow reactor to clean up.
             self.run()
             self.conn = None
-            self.container.global_handler = None # break circular ref: container to cadapter.on_error
-            pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive
+            self.container.global_handler = None  # break circular ref: container to cadapter.on_error
+            pn_collector_release(pn_reactor_collector(self.container._impl))  # straggling event may keep reactor alive
             self.container = None
 
     def _is_closed(self):
@@ -294,7 +308,7 @@ class BlockingConnection(Handler):
                 self.container.timeout = container_timeout
         if self.disconnected or self._is_closed():
             self.container.stop()
-            self.conn.handler = None # break cyclical reference
+            self.conn.handler = None  # break cyclical reference
         if self.disconnected and not self._is_closed():
             raise ConnectionException(
                 "Connection %s disconnected: %s" % (self.url, self.disconnected))
@@ -320,6 +334,7 @@ class BlockingConnection(Handler):
     def on_transport_closed(self, event):
         self.disconnected = event.transport.condition or "unknown"
 
+
 class AtomicCount(object):
     def __init__(self, start=0, step=1):
         """Thread-safe atomic counter. Start at start, increment by step."""
@@ -334,6 +349,7 @@ class AtomicCount(object):
         self.lock.release()
         return result
 
+
 class SyncRequestResponse(IncomingMessageHandler):
     """
     Implementation of the synchronous request-response (aka RPC) pattern.
@@ -374,12 +390,14 @@ class SyncRequestResponse(IncomingMessageHandler):
         request.reply_to = self.reply_to
         request.correlation_id = correlation_id = str(self.correlation_id.next())
         self.sender.send(request)
+
         def wakeup():
             return self.response and (self.response.correlation_id == correlation_id)
+
         self.connection.wait(wakeup, msg="Waiting for response")
         response = self.response
-        self.response = None    # Ready for next response.
-        self.receiver.flow(1)   # Set up credit for the next response.
+        self.response = None  # Ready for next response.
+        self.receiver.flow(1)  # Set up credit for the next response.
         return response
 
     @property
@@ -390,4 +408,4 @@ class SyncRequestResponse(IncomingMessageHandler):
     def on_message(self, event):
         """Called when we receive a message for our receiver."""
         self.response = event.message
-        self.connection.container.yield_() # Wake up the wait() loop to handle the message.
+        self.connection.container.yield_()  # Wake up the wait() loop to handle the message.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org