You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/12/12 12:53:36 UTC

[1/2] qpid-proton git commit: Initial set of engine examples along with supporting additions to the library.

Repository: qpid-proton
Updated Branches:
  refs/heads/master 9a72a30cd -> 34e64e324


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
new file mode 100755
index 0000000..5b11280
--- /dev/null
+++ b/examples/engine/py/tx_send.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxSend(TransactionalClientHandler):
+    def __init__(self, messages, batch_size):
+        super(TxSend, self).__init__()
+        self.current_batch = 0
+        self.committed = 0
+        self.confirmed = 0
+        self.total = messages
+        self.batch_size = batch_size
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672", handler=self)
+        self.sender = self.container.create_sender(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
+        self.transaction = None
+
+    def on_transaction_declared(self, event):
+        self.transaction = event.transaction
+        self.send()
+
+    def on_credit(self, event):
+        self.send()
+
+    def send(self):
+        while self.transaction and self.sender.credit and self.committed < self.total:
+            msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
+            self.sender.send_msg(msg, transaction=self.transaction)
+            self.current_batch += 1
+            if self.current_batch == self.batch_size:
+                self.transaction.commit()
+                self.transaction = None
+
+    def on_accepted(self, event):
+        if event.sender == self.sender:
+            self.confirmed += 1
+
+    def on_transaction_committed(self, event):
+        self.committed += self.current_batch
+        if self.committed == self.total:
+            print "all messages committed"
+            event.connection.close()
+        else:
+            self.current_batch = 0
+            self.container.declare_transaction(self.conn, handler=self)
+
+    def on_disconnected(self, event):
+        self.current_batch = 0
+
+try:
+    Container(TxSend(10000, 10)).run()
+except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py
new file mode 100755
index 0000000..c051408
--- /dev/null
+++ b/examples/engine/py/tx_send_sync.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxSend(TransactionalClientHandler):
+    def __init__(self, messages, batch_size):
+        super(TxSend, self).__init__()
+        self.current_batch = 0
+        self.committed = 0
+        self.confirmed = 0
+        self.total = messages
+        self.batch_size = batch_size
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672", handler=self)
+        self.sender = self.container.create_sender(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
+        self.transaction = None
+
+    def on_transaction_declared(self, event):
+        self.transaction = event.transaction
+        self.send()
+
+    def on_credit(self, event):
+        self.send()
+
+    def send(self):
+        while self.transaction and self.current_batch < self.batch_size and self.sender.credit and self.committed < self.total:
+            msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
+            self.sender.send_msg(msg, transaction=self.transaction)
+            self.current_batch += 1
+
+    def on_accepted(self, event):
+        if event.sender == self.sender:
+            self.confirmed += 1
+            if self.confirmed == self.batch_size:
+                self.transaction.commit()
+                self.transaction = None
+                self.confirmed = 0
+
+    def on_transaction_committed(self, event):
+        self.committed += self.current_batch
+        if self.committed == self.total:
+            print "all messages committed"
+            event.connection.close()
+        else:
+            self.current_batch = 0
+            self.container.declare_transaction(self.conn, handler=self)
+
+    def on_disconnected(self, event):
+        self.current_batch = 0
+
+try:
+    Container(TxSend(10000, 10)).run()
+except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/CMakeLists.txt b/proton-c/bindings/python/CMakeLists.txt
index 6be421e..ee660b6 100644
--- a/proton-c/bindings/python/CMakeLists.txt
+++ b/proton-c/bindings/python/CMakeLists.txt
@@ -53,22 +53,27 @@ if (NOT PYTHON_SITEARCH_PACKAGES)
 endif()
 
 set (pysrc-generated cproton.py)
-set (pysrc proton/__init__.py)
+set (pysrc
+    proton/__init__.py
+    proton/handlers.py
+    proton/reactors.py
+    proton/utils.py
+    )
 
-macro (py_compile directory files)
+macro (py_compile directory files artifacts)
   foreach (src_file ${files})
     install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile ${src_file}
                                   WORKING_DIRECTORY ${directory})")
     install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile ${src_file}
                                   WORKING_DIRECTORY ${directory})")
-    list(APPEND PYTHON_ARTIFACTS ${directory}/${src_file}
+    list(APPEND ${artifacts} ${directory}/${src_file}
       ${directory}/${src_file}c
       ${directory}/${src_file}o)
   endforeach (src_file)
 endmacro(py_compile)
 
-py_compile(${CMAKE_CURRENT_BINARY_DIR} ${pysrc-generated})
-py_compile(${CMAKE_CURRENT_SOURCE_DIR} ${pysrc})
+py_compile(${CMAKE_CURRENT_BINARY_DIR} ${pysrc-generated} CPROTON_ARTIFACTS)
+py_compile(${CMAKE_CURRENT_SOURCE_DIR} "${pysrc}" PROTON_ARTIFACTS)
 
 find_program(EPYDOC_EXE epydoc)
 mark_as_advanced (EPYDOC_EXE)
@@ -84,9 +89,12 @@ if (EPYDOC_EXE)
            ${OPTIONAL_ARG})
 endif (EPYDOC_EXE)
 
-install(FILES ${PYTHON_ARTIFACTS}
+install(FILES ${CPROTON_ARTIFACTS}
         DESTINATION ${PYTHON_SITEARCH_PACKAGES}
         COMPONENT Python)
+install(FILES ${PROTON_ARTIFACTS}
+        DESTINATION "${PYTHON_SITEARCH_PACKAGES}/proton/"
+        COMPONENT Python)
 install(TARGETS ${SWIG_MODULE_cproton_REAL_NAME}
         DESTINATION ${PYTHON_SITEARCH_PACKAGES}
         COMPONENT Python)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 0fb7270..a4ec1c6 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -85,6 +85,9 @@ except ImportError:
   def uuid4():
     return uuid.UUID(bytes=random_uuid())
 
+def generate_uuid():
+  return uuid.uuid4()
+
 try:
   bytes()
 except NameError:
@@ -2965,7 +2968,24 @@ class Delivery(object):
 
   @property
   def link(self):
-    return Link._wrap_link(pn_delivery_link(self._dlv))
+    if not self.released:
+      return Link._wrap_link(pn_delivery_link(self._dlv))
+    else:
+      return None
+
+  @property
+  def session(self):
+    if self.link:
+      return self.link.session
+    else:
+      return None
+
+  @property
+  def connection(self):
+    if self.session:
+      return self.session.connection
+    else:
+      return None
 
 class TransportException(ProtonException):
   pass
@@ -3384,7 +3404,10 @@ class Collector:
 
     clazz = pn_class_name(pn_event_class(event))
     context = wrappers[clazz](pn_event_context(event))
-    return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
+    if isinstance(context, EventBase):
+      return context
+    else:
+      return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
 
   def pop(self):
     ev = self.peek()
@@ -3396,7 +3419,7 @@ class Collector:
     pn_collector_free(self._impl)
     del self._impl
 
-class EventType:
+class EventType(object):
 
   TYPES = {}
 
@@ -3416,7 +3439,20 @@ def dispatch(handler, method, *args):
   elif hasattr(handler, "on_unhandled"):
     return handler.on_unhandled(method, args)
 
-class Event:
+class EventBase(object):
+
+  def __init__(self, clazz, context, type):
+    self.clazz = clazz
+    self.context = context
+    self.type = type
+
+  def _popped(self, collector):
+    pass
+
+  def dispatch(self, handler):
+    return dispatch(handler, self.type.method, self)
+
+class Event(EventBase):
 
   CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init")
   CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound")
@@ -3453,28 +3489,19 @@ class Event:
   TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed")
 
   def __init__(self, clazz, context, type):
-    self.clazz = clazz
-    self.context = context
-    self.type = type
+    super(Event, self).__init__(clazz, context, type)
 
   def _popped(self, collector):
     if self.type in (Event.LINK_FINAL, Event.SESSION_FINAL,
                      Event.CONNECTION_FINAL):
       collector._contexts.remove(self.context)
 
-  def dispatch(self, handler):
-    return dispatch(handler, self.type.method, self)
-
   @property
   def connection(self):
     if self.clazz == "pn_connection":
       return self.context
-    elif self.clazz == "pn_session":
-      return self.context.connection
-    elif self.clazz == "pn_link":
+    elif self.clazz in ["pn_transport", "pn_session", "pn_link", "pn_delivery"]:
       return self.context.connection
-    elif self.clazz == "pn_delivery" and not self.context.released:
-      return self.context.link.connection
     else:
       return None
 
@@ -3482,10 +3509,8 @@ class Event:
   def session(self):
     if self.clazz == "pn_session":
       return self.context
-    elif self.clazz == "pn_link":
+    elif self.clazz in ["pn_link", "pn_delivery"]:
       return self.context.session
-    elif self.clazz == "pn_delivery" and not self.context.released:
-      return self.context.link.session
     else:
       return None
 
@@ -3493,7 +3518,7 @@ class Event:
   def link(self):
     if self.clazz == "pn_link":
       return self.context
-    elif self.clazz == "pn_delivery" and not self.context.released:
+    elif self.clazz in ["pn_delivery"]:
       return self.context.link
     else:
       return None

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
new file mode 100644
index 0000000..71ab837
--- /dev/null
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -0,0 +1,440 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import heapq, os, Queue, re, socket, time, types
+from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
+from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
+from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+from select import select
+
+class FlowController(Handler):
+    """
+    A handler that controls a configured credit window for associated
+    receivers.
+    """
+    def __init__(self, window=1):
+        self.window = window
+
+    def top_up(self, link):
+        delta = self.window - link.credit
+        link.flow(delta)
+
+    def on_link_local_open(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_link_remote_open(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_link_flow(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_delivery(self, event):
+        if not event.delivery.released and event.delivery.link.is_receiver:
+            self.top_up(event.delivery.link)
+
+def nested_handlers(handlers):
+    # currently only allows for a single level of nesting
+    nested = []
+    for h in handlers:
+        nested.append(h)
+        if hasattr(h, 'handlers'):
+            nested.extend(getattr(h, 'handlers'))
+    return nested
+
+def add_nested_handler(handler, nested):
+    if hasattr(handler, 'handlers'):
+        getattr(handler, 'handlers').append(nested)
+    else:
+        handler.handlers = [nested]
+
+class ScopedHandler(Handler):
+    """
+    An internal handler that checks for handlers scoped to the engine
+    objects an event relates to. E.g it allows delivery, link, session
+    or connection scoped handlers that will only be called with events
+    for the object to which they are scoped.
+    """
+    scopes = ["delivery", "link", "session", "connection"]
+
+    def on_unhandled(self, method, args):
+        event = args[0]
+        if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
+            return
+
+        objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
+        targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
+        handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
+        for h in handlers:
+            h(event)
+
+
+class OutgoingMessageHandler(Handler):
+    """
+    A utility for simpler and more intuitive handling of delivery
+    events related to outgoing i.e. sent messages.
+    """
+    def __init__(self, auto_settle=True, delegate=None):
+        self.auto_settle = auto_settle
+        self.delegate = delegate
+
+    def on_link_flow(self, event):
+        if event.link.is_sender and event.link.credit:
+            self.on_credit(event)
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.released: return
+        if dlv.link.is_sender and dlv.updated:
+            if dlv.remote_state == Delivery.ACCEPTED:
+                self.on_accepted(event)
+            elif dlv.remote_state == Delivery.REJECTED:
+                self.on_rejected(event)
+            elif dlv.remote_state == Delivery.RELEASED:
+                self.on_released(event)
+            elif dlv.remote_state == Delivery.MODIFIED:
+                self.on_modified(event)
+            if dlv.settled:
+                self.on_settled(event)
+            if self.auto_settle:
+                dlv.settle()
+
+    def on_credit(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_credit', event)
+
+    def on_accepted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_accepted', event)
+
+    def on_rejected(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_rejected', event)
+
+    def on_released(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_released', event)
+
+    def on_modified(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_modified', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
+
+def recv_msg(delivery):
+    msg = Message()
+    msg.decode(delivery.link.recv(delivery.pending))
+    delivery.link.advance()
+    return msg
+
+class Reject(ProtonException):
+  """
+  An exception that indicate a message should be rejected
+  """
+  pass
+
+class Acking(object):
+    def accept(self, delivery):
+        self.settle(delivery, Delivery.ACCEPTED)
+
+    def reject(self, delivery):
+        self.settle(delivery, Delivery.REJECTED)
+
+    def release(self, delivery, delivered=True):
+        if delivered:
+            self.settle(delivery, Delivery.MODIFIED)
+        else:
+            self.settle(delivery, Delivery.RELEASED)
+
+    def settle(self, delivery, state=None):
+        if state:
+            delivery.update(state)
+        delivery.settle()
+
+class IncomingMessageHandler(Handler, Acking):
+    """
+    A utility for simpler and more intuitive handling of delivery
+    events related to incoming i.e. received messages.
+    """
+
+    def __init__(self, auto_accept=True, delegate=None):
+        self.delegate = delegate
+        self.auto_accept = auto_accept
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.released or not dlv.link.is_receiver: return
+        if dlv.readable and not dlv.partial:
+            event.message = recv_msg(dlv)
+            try:
+                self.on_message(event)
+                if self.auto_accept:
+                    dlv.update(Delivery.ACCEPTED)
+                    dlv.settle()
+            except Reject:
+                dlv.update(Delivery.REJECTED)
+                dlv.settle()
+        elif dlv.updated and dlv.settled:
+            self.on_settled(event)
+
+    def on_message(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_message', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
+
+class EndpointStateHandler(Handler):
+    """
+    A utility that exposes 'endpoint' events i.e. the open/close for
+    links, sessions and connections in a more intuitive manner. A
+    XXX_opened method will be called when both local and remote peers
+    have opened the link, session or connection. This can be used to
+    confirm a locally initiated action for example. A XXX_opening
+    method will be called when the remote peer has requested an open
+    that was not initiated locally. By default this will simply open
+    locally, which then triggers the XXX_opened call. The same applies
+    to close.
+    """
+
+    def __init__(self, peer_close_is_error=False, delegate=None):
+        self.delegate = delegate
+        self.peer_close_is_error = peer_close_is_error
+
+    def is_local_open(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+    def is_local_uninitialised(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_UNINIT
+
+    def is_local_closed(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_CLOSED
+
+    def is_remote_open(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+    def is_remote_closed(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_CLOSED
+
+    def print_error(self, endpoint, endpoint_type):
+        if endpoint.remote_condition:
+            print endpoint.remote_condition.description
+        elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
+            print "%s closed by peer" % endpoint_type
+
+    def on_link_remote_close(self, event):
+        if event.link.remote_condition:
+            self.on_link_error(event)
+        elif self.is_local_closed(event.link):
+            self.on_link_closed(event)
+        else:
+            self.on_link_closing(event)
+        event.link.close()
+
+    def on_session_remote_close(self, event):
+        if event.session.remote_condition:
+            self.on_session_error(event)
+        elif self.is_local_closed(event.session):
+            self.on_session_closed(event)
+        else:
+            self.on_session_closing(event)
+        event.session.close()
+
+    def on_connection_remote_close(self, event):
+        if event.connection.remote_condition:
+            self.on_connection_error(event)
+        elif self.is_local_closed(event.connection):
+            self.on_connection_closed(event)
+        else:
+            self.on_connection_closing(event)
+        event.connection.close()
+
+    def on_connection_local_open(self, event):
+        if self.is_remote_open(event.connection):
+            self.on_connection_opened(event)
+
+    def on_connection_remote_open(self, event):
+        if self.is_local_open(event.connection):
+            self.on_connection_opened(event)
+        elif self.is_local_uninitialised(event.connection):
+            self.on_connection_opening(event)
+            event.connection.open()
+
+    def on_session_local_open(self, event):
+        if self.is_remote_open(event.session):
+            self.on_session_opened(event)
+
+    def on_session_remote_open(self, event):
+        if self.is_local_open(event.session):
+            self.on_session_opened(event)
+        elif self.is_local_uninitialised(event.session):
+            self.on_session_opening(event)
+            event.session.open()
+
+    def on_link_local_open(self, event):
+        if self.is_remote_open(event.link):
+            self.on_link_opened(event)
+
+    def on_link_remote_open(self, event):
+        if self.is_local_open(event.link):
+            self.on_link_opened(event)
+        elif self.is_local_uninitialised(event.link):
+            self.on_link_opening(event)
+            event.link.open()
+
+    def on_connection_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opened', event)
+
+    def on_session_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opened', event)
+
+    def on_link_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opened', event)
+
+    def on_connection_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opening', event)
+
+    def on_session_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opening', event)
+
+    def on_link_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opening', event)
+
+    def on_connection_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_error', event)
+        else:
+            self.print_error(event.connection, "connection")
+
+    def on_session_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_error', event)
+        else:
+            self.print_error(event.session, "session")
+            event.connection.close()
+
+    def on_link_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_error', event)
+        else:
+            self.print_error(event.link, "link")
+            event.connection.close()
+
+    def on_connection_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closed', event)
+
+    def on_session_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closed', event)
+
+    def on_link_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closed', event)
+
+    def on_connection_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closing', event)
+        elif self.peer_close_is_error:
+            self.on_connection_error(event)
+
+    def on_session_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closing', event)
+        elif self.peer_close_is_error:
+            self.on_session_error(event)
+
+    def on_link_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closing', event)
+        elif self.peer_close_is_error:
+            self.on_link_error(event)
+
+class MessagingHandler(Handler, Acking):
+    """
+    A general purpose handler that makes the proton-c events somewhat
+    simpler to deal with and.or avoids repetitive tasks for common use
+    cases.
+    """
+    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
+
+class TransactionalAcking(object):
+    def accept(self, delivery, transaction):
+        transaction.accept(delivery)
+
+class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
+    def __init__(self, auto_settle=True, delegate=None):
+        super(TransactionHandler, self).__init__(auto_settle, delegate)
+
+    def on_settled(self, event):
+        if hasattr(event.delivery, "transaction"):
+            event.transaction = event.delivery.transaction
+            event.delivery.transaction.handle_outcome(event)
+
+    def on_transaction_declared(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declared', event)
+
+    def on_transaction_committed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_committed', event)
+
+    def on_transaction_aborted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_aborted', event)
+
+    def on_transaction_declare_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declare_failed', event)
+
+    def on_transaction_commit_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_commit_failed', event)
+
+class TransactionalClientHandler(Handler, TransactionalAcking):
+    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
+        super(TransactionalClientHandler, self).__init__()
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(TransactionHandler(auto_settle, self))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
new file mode 100644
index 0000000..cab8c31
--- /dev/null
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -0,0 +1,827 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os, Queue, socket, time, types
+from heapq import heappush, heappop, nsmallest
+from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
+from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
+from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
+from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
+from select import select
+from proton.handlers import nested_handlers, ScopedHandler
+
+class AmqpSocket(object):
+    """
+    Associates a transport with a connection and a socket and can be
+    used in an io loop to track the io for an AMQP 1.0 connection.
+    """
+
+    def __init__(self, conn, sock, events, heartbeat=None):
+        self.events = events
+        self.conn = conn
+        self.transport = Transport()
+        if heartbeat: self.transport.idle_timeout = heartbeat
+        self.transport.bind(self.conn)
+        self.socket = sock
+        self.socket.setblocking(0)
+        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        self.write_done = False
+        self.read_done = False
+        self._closed = False
+
+    def accept(self, force_sasl=True):
+        if force_sasl:
+            sasl = self.transport.sasl()
+            sasl.mechanisms("ANONYMOUS")
+            sasl.server()
+            sasl.done(SASL.OK)
+        #TODO: use SASL anyway if requested by peer
+        return self
+
+    def connect(self, host, port=None, username=None, password=None, force_sasl=True):
+        if username and password:
+            sasl = self.transport.sasl()
+            sasl.plain(username, password)
+        elif force_sasl:
+            sasl = self.transport.sasl()
+            sasl.mechanisms('ANONYMOUS')
+            sasl.client()
+        try:
+            self.socket.connect_ex((host, port or 5672))
+        except socket.gaierror, e:
+            raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
+        return self
+
+    def _closed_cleanly(self):
+        return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED
+
+    def closed(self):
+        if not self._closed and self.write_done and self.read_done:
+            self.close()
+            return True
+        else:
+            return False
+
+    def close(self):
+        self.socket.close()
+        self._closed = True
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def reading(self):
+        if self.read_done: return False
+        c = self.transport.capacity()
+        if c > 0:
+            return True
+        elif c < 0:
+            self.read_done = True
+        return False
+
+    def writing(self):
+        if self.write_done: return False
+        try:
+            p = self.transport.pending()
+            if p > 0:
+                return True
+            elif p < 0:
+                self.write_done = True
+                return False
+            else: # p == 0
+                return False
+        except TransportException, e:
+            self.write_done = True
+            return False
+
+    def readable(self):
+        c = self.transport.capacity()
+        if c > 0:
+            try:
+                data = self.socket.recv(c)
+                if data:
+                    self.transport.push(data)
+                else:
+                    if not self._closed_cleanly():
+                        self.read_done = True
+                        self.write_done = True
+                    else:
+                        self.transport.close_tail()
+            except TransportException, e:
+                print "Error on read: %s" % e
+                self.read_done = True
+            except socket.error, e:
+                print "Error on recv: %s" % e
+                self.read_done = True
+                self.write_done = True
+        elif c < 0:
+            self.read_done = True
+
+    def writable(self):
+        try:
+            p = self.transport.pending()
+            if p > 0:
+                data = self.transport.peek(p)
+                n = self.socket.send(data)
+                self.transport.pop(n)
+            elif p < 0:
+                self.write_done = True
+        except TransportException, e:
+            print "Error on write: %s" % e
+            self.write_done = True
+        except socket.error, e:
+            print "Error on send: %s" % e
+            self.write_done = True
+
+    def removed(self):
+        if not self._closed_cleanly():
+            self.transport.unbind()
+            self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn))
+
+    def tick(self):
+        t = self.transport.tick(time.time())
+        if t: return t
+        else: return None
+
+class AmqpAcceptor:
+    """
+    Listens for incoming sockets, creates an AmqpSocket for them and
+    adds that to the list of tracked 'selectables'. The acceptor can
+    itself be added to an io loop.
+    """
+
+    def __init__(self, events, loop, host, port):
+        self.events = events
+        self.loop = loop
+        self.socket = socket.socket()
+        self.socket.setblocking(0)
+        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.socket.bind((host, port))
+        self.socket.listen(5)
+        self.loop.add(self)
+        self._closed = False
+
+    def closed(self):
+        if self._closed:
+            self.socket.close()
+            return True
+        else:
+            return False
+
+    def close(self):
+        self._closed = True
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def reading(self):
+        return not self._closed
+
+    def writing(self):
+        return False
+
+    def readable(self):
+        sock, addr = self.socket.accept()
+        if sock:
+            self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept())
+
+    def removed(self): pass
+    def tick(self): return None
+
+
+class EventInjector(object):
+    """
+    Can be added to an io loop to allow events to be triggered by an
+    external thread but handled on the event thread associated with
+    the loop.
+    """
+    def __init__(self, collector):
+        self.collector = collector
+        self.queue = Queue.Queue()
+        self.pipe = os.pipe()
+        self._closed = False
+
+    def trigger(self, event):
+        self.queue.put(event)
+        os.write(self.pipe[1], "!")
+
+    def closed(self):
+        return self._closed and self.queue.empty()
+
+    def close(self):
+        self._closed = True
+
+    def fileno(self):
+        return self.pipe[0]
+
+    def reading(self):
+        return True
+
+    def writing(self):
+        return False
+
+    def readable(self):
+        os.read(self.pipe[0], 512)
+        while not self.queue.empty():
+            event = self.queue.get()
+            self.collector.put(event.context, event.type)
+
+    def removed(self): pass
+    def tick(self): return None
+
+class PQueue:
+
+    def __init__(self):
+        self.entries = []
+
+    def add(self, priority, task):
+        heappush(self.entries, (priority, task))
+
+    def peek(self):
+        if self.entries:
+            return nsmallest(1, self.entries)[0]
+        else:
+            return None
+
+    def pop(self):
+        if self.entries:
+            return heappop(self.entries)
+        else:
+            return None
+
+    def __nonzero__(self):
+        if self.entries:
+            return True
+        else:
+            return False
+
+class Timer:
+    def __init__(self, collector):
+        self.collector = collector
+        self.events = PQueue()
+
+    def schedule(self, deadline, event):
+        self.events.add(deadline, event)
+
+    def tick(self):
+        while self.events:
+            deadline, event = self.events.peek()
+            if time.time() > deadline:
+                self.events.pop()
+                self.collector.put(event.context, event.type)
+            else:
+                return deadline
+        return None
+
+    @property
+    def pending(self):
+        return bool(self.events)
+
+class Events(object):
+    def __init__(self, *handlers):
+        self.collector = Collector()
+        self.timer = Timer(self.collector)
+        self.handlers = handlers
+
+    def connection(self):
+        conn = Connection()
+        conn.collect(self.collector)
+        return conn
+
+    def process(self):
+        result = False
+        while True:
+            ev = self.collector.peek()
+            if ev:
+                self.dispatch(ev)
+                self.collector.pop()
+                result = True
+            else:
+                return result
+
+    def dispatch(self, event):
+        for h in self.handlers:
+            event.dispatch(h)
+
+    @property
+    def empty(self):
+        return self.collector.peek() == None and not self.timer.pending
+
+class Names(object):
+    def __init__(self, base=10000):
+        self.names = []
+        self.base = base
+
+    def number(self, name):
+        if name not in self.names:
+            self.names.append(name)
+        return self.names.index(name) + self.base
+
+class ExtendedEventType(EventType):
+    USED = Names()
+    """
+    Event type identifier for events defined outside the proton-c
+    library
+    """
+    def __init__(self, name, number=None):
+        super(ExtendedEventType, self).__init__(number or ExtendedEventType.USED.number(name), "on_%s" % name)
+        self.name = name
+
+class ApplicationEvent(EventBase):
+    """
+    Application defined event, which can optionally be associated with
+    an engine object and or an arbitrary subject
+    """
+    def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+        super(ApplicationEvent, self).__init__(PN_PYREF, self, ExtendedEventType(typename))
+        self.connection = connection
+        self.session = session
+        self.link = link
+        self.delivery = delivery
+        if self.delivery:
+            self.link = self.delivery.link
+        if self.link:
+            self.session = self.link.session
+        if self.session:
+            self.connection = self.session.connection
+        self.subject = subject
+
+    def __repr__(self):
+        objects = [self.connection, self.session, self.link, self.delivery, self.subject]
+        return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
+
+class StartEvent(ApplicationEvent):
+    def __init__(self, container):
+        super(StartEvent, self).__init__("start")
+        self.container = container
+
+def _min(a, b):
+    if a and b: return min(a, b)
+    elif a: return a
+    else: return b
+
+class SelectLoop(object):
+    """
+    An io loop based on select()
+    """
+    def __init__(self, events):
+        self.events = events
+        self.selectables = []
+        self._abort = False
+
+    def abort(self):
+        self._abort = True
+
+    def add(self, selectable):
+        self.selectables.append(selectable)
+
+    def remove(self, selectable):
+        self.selectables.remove(selectable)
+
+    @property
+    def redundant(self):
+        return self.events.empty and not self.selectables
+
+    @property
+    def aborted(self):
+        return self._abort
+
+    def run(self):
+        while not (self._abort or self.redundant):
+            self.do_work()
+
+    def do_work(self, timeout=None):
+        """@return True if some work was done, False if time-out expired"""
+        tick = self.events.timer.tick()
+        while self.events.process():
+            if self._abort: return
+            tick = self.events.timer.tick()
+
+        stable = False
+        while not stable:
+            reading = []
+            writing = []
+            closed = []
+            for s in self.selectables:
+                if s.reading(): reading.append(s)
+                if s.writing(): writing.append(s)
+                if s.closed(): closed.append(s)
+                else: tick = _min(tick, s.tick())
+
+            for s in closed:
+                self.selectables.remove(s)
+                s.removed()
+            stable = len(closed) == 0
+
+        if self.redundant:
+            return
+
+        if tick:
+            timeout = _min(tick - time.time(), timeout)
+        if timeout and timeout < 0:
+            timeout = 0
+        if reading or writing or timeout:
+            readable, writable, _ = select(reading, writing, [], timeout)
+            for s in self.selectables:
+                s.tick()
+            for s in readable:
+                s.readable()
+            for s in writable:
+                s.writable()
+
+            return bool(readable or writable)
+        else:
+            return False
+
+def delivery_tags():
+    count = 1
+    while True:
+        yield str(count)
+        count += 1
+
+def send_msg(sender, msg, tag=None, handler=None, transaction=None):
+    dlv = sender.delivery(tag or next(sender.tags))
+    if transaction:
+        dlv.local.data = [transaction.id]
+        dlv.update(0x34)
+    if handler:
+        dlv.context = handler
+    sender.send(msg.encode())
+    sender.advance()
+    return dlv
+
+def _send_msg(self, msg, tag=None, handler=None, transaction=None):
+    return send_msg(self, msg, tag, handler, transaction)
+
+
+class Transaction(object):
+    """
+    Class to track state of an AMQP 1.0 transaction.
+    """
+    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
+        self.txn_ctrl = txn_ctrl
+        self.handler = handler
+        self.id = None
+        self._declare = None
+        self._discharge = None
+        self.failed = False
+        self._pending = []
+        self.settle_before_discharge = settle_before_discharge
+        self.declare()
+
+    def commit(self):
+        self.discharge(False)
+
+    def abort(self):
+        self.discharge(True)
+
+    def declare(self):
+        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
+
+    def discharge(self, failed):
+        self.failed = failed
+        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
+
+    def _send_ctrl(self, descriptor, value):
+        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler)
+        delivery.transaction = self
+        return delivery
+
+    def accept(self, delivery):
+        self.update(delivery, PN_ACCEPTED)
+        if self.settle_before_discharge:
+            delivery.settle()
+        else:
+            self._pending.append(delivery)
+
+    def update(self, delivery, state=None):
+        if state:
+            delivery.local.data = [self.id, Described(ulong(state), [])]
+            delivery.update(0x34)
+
+    def _release_pending(self):
+        for d in self._pending:
+            d.update(Delivery.RELEASED)
+            d.settle()
+        self._clear_pending()
+
+    def _clear_pending(self):
+        self._pending = []
+
+    def handle_outcome(self, event):
+        if event.delivery == self._declare:
+            if event.delivery.remote.data:
+                self.id = event.delivery.remote.data[0]
+                self.handler.on_transaction_declared(event)
+            elif event.delivery.remote_state == Delivery.REJECTED:
+                self.handler.on_transaction_declare_failed(event)
+            else:
+                print "Unexpected outcome for declare: %s" % event.delivery.remote_state
+                self.handler.on_transaction_declare_failed(event)
+        elif event.delivery == self._discharge:
+            if event.delivery.remote_state == Delivery.REJECTED:
+                if not self.failed:
+                    self.handler.on_transaction_commit_failed(event)
+                    self._release_pending() # make this optional?
+            else:
+                if self.failed:
+                    self.handler.on_transaction_aborted(event)
+                    self._release_pending()
+                else:
+                    self.handler.on_transaction_committed(event)
+            self._clear_pending()
+
+class LinkOption(object):
+    """
+    Abstract interface for link configuration options
+    """
+    def apply(self, link):
+        """
+        Subclasses will implement any configuration logic in this
+        method
+        """
+        pass
+    def test(self, link):
+        """
+        Subclasses can override this to selectively apply an option
+        e.g. based on some link criteria
+        """
+        return True
+
+class AtMostOnce(LinkOption):
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_SETTLED
+
+class AtLeastOnce(LinkOption):
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_UNSETTLED
+        link.rcv_settle_mode = Link.RCV_FIRST
+
+class SenderOption(LinkOption):
+    def apply(self, sender): pass
+    def test(self, link): return link.is_sender
+
+class ReceiverOption(LinkOption):
+    def apply(self, receiver): pass
+    def test(self, link): return link.is_receiver
+
+class Filter(ReceiverOption):
+    def __init__(self, filter_set={}):
+        self.filter_set = filter_set
+
+    def apply(self, receiver):
+        receiver.source.filter.put_dict(self.filter_set)
+
+class Selector(Filter):
+    """
+    Configures a link with a message selector filter
+    """
+    def __init__(self, value, name='selector'):
+        super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
+
+def _apply_link_options(options, link):
+    if options:
+        if isinstance(options, list):
+            for o in options:
+                if o.test(link): o.apply(link)
+        else:
+            if options.test(link): options.apply(link)
+
+def _create_session(connection, handler=None):
+    session = connection.session()
+    session.open()
+    return session
+
+
+def _get_attr(target, name):
+    if hasattr(target, name):
+        return getattr(target, name)
+    else:
+        return None
+
+class SessionPerConnection(object):
+    def __init__(self):
+        self._default_session = None
+
+    def session(self, connection):
+        if not self._default_session:
+            self._default_session = _create_session(connection)
+            self._default_session.context = self
+        return self._default_session
+
+    def on_session_remote_close(self, event):
+        event.connection.close()
+        self._default_session = None
+
+class Connector(Handler):
+    """
+    Internal handler that triggers the necessary socket connect for an
+    opened connection.
+    """
+    def __init__(self, loop):
+        self.loop = loop
+
+    def _connect(self, connection):
+        host, port = connection.address.next()
+        #print "connecting to %s:%i" % (host, port)
+        heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None
+        self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port))
+        connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
+
+    def on_connection_local_open(self, event):
+        if hasattr(event.connection, "address"):
+            self._connect(event.connection)
+
+    def on_connection_remote_open(self, event):
+        if hasattr(event.connection, "reconnect"):
+            event.connection.reconnect.reset()
+
+    def on_disconnected(self, event):
+        if hasattr(event.connection, "reconnect"):
+            event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference
+            delay = event.connection.reconnect.next()
+            if delay == 0:
+                print "Disconnected, reconnecting..."
+                self._connect(event.connection)
+            else:
+                print "Disconnected will try to reconnect after %s seconds" % delay
+                self.loop.schedule(time.time() + delay, connection=event.connection, subject=self)
+        else:
+            print "Disconnected"
+
+    def on_timer(self, event):
+        if event.subject == self and event.connection:
+            self._connect(event.connection)
+
+class Backoff(object):
+    """
+    A reconnect strategy involving an increasing delay between
+    retries, up to a maximum or 10 seconds.
+    """
+    def __init__(self):
+        self.delay = 0
+
+    def reset(self):
+        self.delay = 0
+
+    def next(self):
+        current = self.delay
+        if current == 0:
+            self.delay = 0.1
+        else:
+            self.delay = min(10, 2*current)
+        return current
+
+class Urls(object):
+    def __init__(self, values):
+        self.values = [Url(v) for v in values]
+        self.i = iter(self.values)
+
+    def __iter__(self):
+        return self
+
+    def _as_pair(self, url):
+        return (url.host, url.port)
+
+    def next(self):
+        try:
+            return self._as_pair(self.i.next())
+        except StopIteration:
+            self.i = iter(self.values)
+            return self._as_pair(self.i.next())
+
+class Container(object):
+    def __init__(self, *handlers):
+        h = [Connector(self), ScopedHandler()]
+        h.extend(nested_handlers(handlers))
+        self.events = Events(*h)
+        self.loop = SelectLoop(self.events)
+        self.trigger = None
+        self.container_id = str(generate_uuid())
+
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None):
+        conn = self.events.connection()
+        conn._pin = conn #circular reference until the open event gets handled
+        if handler:
+            conn.context = handler
+        conn.container = self.container_id or str(generate_uuid())
+        conn.heartbeat = heartbeat
+        if url: conn.address = Urls([url])
+        elif urls: conn.address = Urls(urls)
+        elif address: conn.address = address
+        else: raise ValueError("One of url, urls or address required")
+        if reconnect:
+            conn.reconnect = reconnect
+        elif reconnect is None:
+            conn.reconnect = Backoff()
+        conn._session_policy = SessionPerConnection() #todo: make configurable
+        conn.open()
+        return conn
+
+    def _get_id(self, container, remote, local):
+        if local and remote: "%s-%s-%s" % (container, remote, local)
+        elif local: return "%s-%s" % (container, local)
+        elif remote: return "%s-%s" % (container, remote)
+        else: return "%s-%s" % (container, str(generate_uuid()))
+
+    def _get_session(self, context):
+        if isinstance(context, Url):
+            return self._get_session(self.connect(url=context))
+        elif isinstance(context, Session):
+            return context
+        elif isinstance(context, Connection):
+            if hasattr(context, '_session_policy'):
+                return context._session_policy.session(context)
+            else:
+                return _create_session(context)
+        else:
+            return context.session()
+
+    def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
+        if isinstance(context, basestring):
+            context = Url(context)
+        if isinstance(context, Url) and not target:
+            target = context.path
+        session = self._get_session(context)
+        snd = session.sender(name or self._get_id(session.connection.container, target, source))
+        if source:
+            snd.source.address = source
+        if target:
+            snd.target.address = target
+        if handler:
+            snd.context = handler
+        snd.tags = tags or delivery_tags()
+        snd.send_msg = types.MethodType(_send_msg, snd)
+        _apply_link_options(options, snd)
+        snd.open()
+        return snd
+
+    def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
+        if isinstance(context, basestring):
+            context = Url(context)
+        if isinstance(context, Url) and not source:
+            source = context.path
+        session = self._get_session(context)
+        rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
+        if source:
+            rcv.source.address = source
+        if dynamic:
+            rcv.source.dynamic = True
+        if target:
+            rcv.target.address = target
+        if handler:
+            rcv.context = handler
+        _apply_link_options(options, rcv)
+        rcv.open()
+        return rcv
+
+    def declare_transaction(self, context, handler=None, settle_before_discharge=False):
+        if not _get_attr(context, '_txn_ctrl'):
+            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl')
+            context._txn_ctrl.target.type = Terminus.COORDINATOR
+            context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
+
+    def listen(self, url):
+        host, port = Urls([url]).next()
+        return AmqpAcceptor(self.events, self, host, port)
+
+    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+    def get_event_trigger(self):
+        if not self.trigger or self.trigger.closed():
+            self.trigger = EventInjector(self.events.collector)
+            self.add(self.trigger)
+        return self.trigger
+
+    def add(self, selectable):
+        self.loop.add(selectable)
+
+    def remove(self, selectable):
+        self.loop.remove(selectable)
+
+    def run(self):
+        self.events.dispatch(StartEvent(self))
+        self.loop.run()
+
+    def stop(self):
+        self.loop.abort()
+
+    def do_work(self, timeout=None):
+        return self.loop.do_work(timeout)
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
new file mode 100644
index 0000000..03c9417
--- /dev/null
+++ b/proton-c/bindings/python/proton/utils.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import Queue, socket, time
+from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
+from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
+from proton.handlers import ScopedHandler
+
+class BlockingLink(object):
+    def __init__(self, connection, link):
+        self.connection = connection
+        self.link = link
+        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+                             msg="Opening link %s" % link.name)
+
+    def close(self):
+        self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE),
+                             msg="Closing link %s" % link.name)
+
+    # Access to other link attributes.
+    def __getattr__(self, name): return getattr(self.link, name)
+
+class BlockingSender(BlockingLink):
+    def __init__(self, connection, sender):
+        super(BlockingSender, self).__init__(connection, sender)
+
+    def send_msg(self, msg):
+        delivery = send_msg(self.link, msg)
+        self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
+
+class BlockingReceiver(BlockingLink):
+    def __init__(self, connection, receiver, credit=1):
+        super(BlockingReceiver, self).__init__(connection, receiver)
+        if credit: receiver.flow(credit)
+
+class BlockingConnection(Handler):
+    """
+    A synchronous style connection wrapper.
+    """
+    def __init__(self, url, timeout=None, container=None):
+        self.timeout = timeout
+        self.container = container or Container()
+        if isinstance(url, basestring):
+            self.url = Url(url)
+        else:
+            self.url = url
+        self.conn = self.container.connect(url=self.url, handler=self)
+        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+                  msg="Opening connection")
+
+    def create_sender(self, address, handler=None):
+        return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler))
+
+    def create_receiver(self, address, credit=1, dynamic=False, handler=None):
+        return BlockingReceiver(
+            self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit)
+
+    def close(self):
+        self.conn.close()
+        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+                  msg="Closing connection")
+
+    def run(self):
+        """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+        self.container.run()
+
+    def wait(self, condition, timeout=False, msg=None):
+        """Call do_work until condition() is true"""
+        if timeout is False:
+            timeout = self.timeout
+        if timeout is None:
+            while not condition():
+                self.container.do_work()
+        else:
+            deadline = time.time() + timeout
+            while not condition():
+                if not self.container.do_work(deadline - time.time()):
+                    txt = "Connection %s timed out" % self.url
+                    if msg: txt += ": " + msg
+                    raise Timeout(txt)
+
+    def on_link_remote_close(self, event):
+        if event.link.state & Endpoint.LOCAL_ACTIVE:
+            self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        if event.connection.state & Endpoint.LOCAL_ACTIVE:
+            self.closed(event.connection.remote_condition)
+
+    def on_disconnected(self, event):
+        raise ConnectionException("Connection %s disconnected" % self.url);
+
+    def closed(self, error=None):
+        txt = "Connection %s closed" % self.url
+        if error:
+            txt += " due to: %s" % error
+        else:
+            txt += " by peer"
+        raise ConnectionException(txt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: Initial set of engine examples along with supporting additions to the library.

Posted by gs...@apache.org.
Initial set of engine examples along with supporting additions to the library.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/34e64e32
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/34e64e32
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/34e64e32

Branch: refs/heads/master
Commit: 34e64e3248b67971b701d990709920fa690c12b4
Parents: 9a72a30
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Dec 5 20:49:54 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Fri Dec 12 11:04:53 2014 +0000

----------------------------------------------------------------------
 examples/engine/py/README                       | 155 ++++
 examples/engine/py/abstract_server.py           |  35 +
 examples/engine/py/client.py                    |  59 ++
 examples/engine/py/client_http.py               | 110 +++
 examples/engine/py/common.py                    | 699 ++++++++++++++++
 examples/engine/py/db_common.py                 |  93 +++
 examples/engine/py/db_ctrl.py                   |  46 ++
 examples/engine/py/db_recv.py                   |  54 ++
 examples/engine/py/db_send.py                   |  85 ++
 examples/engine/py/helloworld.py                |  45 +
 examples/engine/py/helloworld_blocking.py       |  35 +
 examples/engine/py/helloworld_direct.py         |  47 ++
 examples/engine/py/helloworld_direct_tornado.py |  52 ++
 examples/engine/py/helloworld_tornado.py        |  49 ++
 examples/engine/py/proton_server.py             |  61 ++
 examples/engine/py/proton_tornado.py            |  70 ++
 examples/engine/py/recurring_timer.py           |  43 +
 examples/engine/py/recurring_timer_tornado.py   |  44 +
 examples/engine/py/selected_recv.py             |  40 +
 examples/engine/py/server.py                    |  56 ++
 examples/engine/py/server_tx.py                 |  77 ++
 examples/engine/py/simple_recv.py               |  40 +
 examples/engine/py/simple_send.py               |  53 ++
 examples/engine/py/sync_client.py               |  88 ++
 examples/engine/py/tx_recv.py                   |  61 ++
 examples/engine/py/tx_recv_interactive.py       |  83 ++
 examples/engine/py/tx_send.py                   |  75 ++
 examples/engine/py/tx_send_sync.py              |  76 ++
 proton-c/bindings/python/CMakeLists.txt         |  20 +-
 proton-c/bindings/python/proton/__init__.py     |  63 +-
 proton-c/bindings/python/proton/handlers.py     | 440 ++++++++++
 proton-c/bindings/python/proton/reactors.py     | 827 +++++++++++++++++++
 proton-c/bindings/python/proton/utils.py        | 114 +++
 33 files changed, 3870 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/README
----------------------------------------------------------------------
diff --git a/examples/engine/py/README b/examples/engine/py/README
new file mode 100644
index 0000000..a361fc4
--- /dev/null
+++ b/examples/engine/py/README
@@ -0,0 +1,155 @@
+Most (though not all) of the current examples require a broker or
+similar intermediary that supports the AMQP 1.0 protocol, allows
+anonymous connections and accepts links to and from a node named
+'examples'.
+
+------------------------------------------------------------------
+
+helloworld.py
+
+Basic example that connects to an intermediary on localhost:5672,
+establishes a subscription from the 'examples' node on that
+intermediary, then creates a sending link to the same node and sends
+one message. On receving the message back via the subcription, the
+connection is closed.
+
+helloworld_blocking.py
+
+The same as the basic helloworld.py, but using a
+synchronous/sequential style wrapper on top of the
+asynchronous/reactive API. The purpose of this example is just to show
+how different functionality can be easily layered should it be
+desired.
+
+helloworld_direct.py
+
+A variant of the basic hellpwprld example, that does not use an
+intermediary, but listens for incoming connections itself. It
+establishes a connection to itself with a link over which a single
+message is sent. This demonstrates the ease with which a simple daemon
+can be built using the API.
+
+helloworld_tornado.py
+helloworld_direct_tornado.py
+
+These are variant of the helloworld.py and helloworld_direct.py
+examples that use the event loop from the tornado library, rather than
+that provided within proton itself and demonstrate how proton can be
+used with external loops.
+
+-------------------------------------------------------------------
+
+simple_send.py
+
+An example of sending a fixed number of messages and tracking their
+(asynchronous) acknowledgement. Handles disconnection while
+maintaining an at-least-once guarantee (there may be duplicates, but
+no message in the sequence should be lost). Messages are sent through
+the 'examples' node on an intermediary accessible on port 5672 on
+localhost.
+
+simple_recv.py
+
+Subscribes to the 'examples' node on an intermediary accessible on port 5672 on
+localhost. Simply prints out the body of received messages.
+
+db_send.py
+
+A more realistic sending example, where the messages come from records
+in a simple database table. On being acknowledged the records can be
+deleted from the table. The database access is done in a separate
+thread, so as not to block the event thread during data
+access. Messages are sent through the 'examples' node on an
+intermediary accessible on port 5672 on localhost.
+
+db_recv.py
+
+A receiving example that records messages received from the 'examples'
+node on localhost:5672 in a database table and only acknowledges them
+when the insert completes. Database access is again done in a separate
+thread from the event loop.
+
+db_ctrl.py
+
+A utility for setting up the database tables for the two examples
+above. Takes two arguments, the action to perform and the name of the
+database on which to perfom it. The database used by db_send.py is
+src_db, that by db_recv.py is dst_db. The valid actions are 'init',
+which creates the table, 'list' which displays the contents and
+'insert' which inserts records from standard-in and is used to
+populate src_db, e.g. for i in `seq 1 50`; do echo "Message-$i"; done
+| ./db_ctrl.py insert src_db.
+
+tx_send.py
+
+A sender that sends messages in atomic batches using local
+transactions (this example does not persist the messages in anyway).
+
+tx_send_sync.py
+
+A variant of the former example that waits for all messages in a batch
+to be acknowledged before committing. Used only to work around an
+ordering issue in preoton that affected qpidd.
+
+tx_recv.py
+
+A receiver example that accepts batches of messages using local
+transactions.
+
+tx_recv_interactive.py
+
+A testing utility that allow interactive control of the
+transactions. Actions are keyed in to the console, 'fetch' will
+request another message, 'abort' will abort the transaction, 'commit'
+will commit it.
+
+The various send/recv examples can be mixed and matched if desired.
+
+-------------------------------------------------------------------
+
+client.py
+
+The client part of a request-response example. Sends requests and
+prints out responses. Requires an intermediary that support the AMQP
+1.0 dynamic nodes on which the responses are received. The requests
+are sent through the 'examples' node.
+
+server.py
+
+The server part of a request-response example, that receives requests
+via the examples node, converts the body to uppercase and sends the
+result back to the indicated reply address.
+
+sync_client.py
+
+A variant of the client part, that uses a blocking/synchronous style
+instead of the reactive/asynchronous style.
+
+client_http.py
+
+A variant of the client part that takes the input to be submitted in
+the request over HTTP (point your browser to localhost:8888/client)
+
+server_tx.py
+
+A variant of the server part that consumes the request and sends out
+the response atomically in a local transaction.
+
+-------------------------------------------------------------------
+
+selected_recv.py
+
+An example that uses a selector filter.
+
+-------------------------------------------------------------------
+
+recurring_timer.py
+
+An example showing a simple timer event.
+
+recurring_timer_tornado.py
+
+A variant of the above that uses the tornado eventloop instead.
+
+-------------------------------------------------------------------
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/abstract_server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/abstract_server.py b/examples/engine/py/abstract_server.py
new file mode 100755
index 0000000..2d0de32
--- /dev/null
+++ b/examples/engine/py/abstract_server.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton_server import Server
+
+class Application(Server):
+    def __init__(self, host, address):
+        super(Application, self).__init__(host, address)
+
+    def on_request(self, request, reply_to):
+        response = request.upper()
+        self.send(response, reply_to)
+        print "Request from: %s" % reply_to
+
+try:
+    Application("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py
new file mode 100755
index 0000000..d1e2706
--- /dev/null
+++ b/examples/engine/py/client.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Client(MessagingHandler):
+    def __init__(self, host, address, requests):
+        super(Client, self).__init__()
+        self.host = host
+        self.address = address
+        self.requests = requests
+
+    def on_start(self, event):
+        self.conn = event.container.connect(self.host)
+        self.sender = event.container.create_sender(self.conn, self.address)
+        self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)
+
+    def next_request(self):
+        if self.receiver.remote_source.address:
+            req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
+            self.sender.send_msg(req)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.next_request()
+
+    def on_message(self, event):
+        print "%s => %s" % (self.requests.pop(0), event.message.body)
+        if self.requests:
+            self.next_request()
+        else:
+            self.conn.close()
+
+REQUESTS= ["Twas brillig, and the slithy toves",
+           "Did gire and gymble in the wabe.",
+           "All mimsy were the borogroves,",
+           "And the mome raths outgrabe."]
+
+Container(Client("localhost:5672", "examples", REQUESTS)).run()
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
new file mode 100755
index 0000000..5202f8d
--- /dev/null
+++ b/examples/engine/py/client_http.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+from tornado.ioloop import IOLoop
+import tornado.web
+
+class Client(MessagingHandler):
+    def __init__(self, host, address):
+        super(Client, self).__init__()
+        self.host = host
+        self.address = address
+        self.sent = []
+        self.pending = []
+        self.reply_address = None
+        self.sender = None
+        self.receiver = None
+
+    def on_start(self, event):
+        conn = event.container.connect(self.host)
+        self.sender = event.container.create_sender(conn, self.address)
+        self.receiver = event.container.create_receiver(conn, None, dynamic=True)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.reply_address = event.link.remote_source.address
+            self.do_request()
+
+    def on_credit(self, event):
+        self.do_request()
+
+    def on_message(self, event):
+        if self.sent:
+            request, handler = self.sent.pop(0)
+            print "%s => %s" % (request, event.message.body)
+            handler(event.message.body)
+            self.do_request()
+
+    def do_request(self):
+        if self.pending and self.reply_address and self.sender.credit:
+            request, handler = self.pending.pop(0)
+            self.sent.append((request, handler))
+            req = Message(reply_to=self.reply_address, body=request)
+            self.sender.send_msg(req)
+
+    def request(self, body, handler):
+        self.pending.append((body, handler))
+        self.do_request()
+
+class ExampleHandler(tornado.web.RequestHandler):
+    def initialize(self, client):
+        self.client = client
+
+    def get(self):
+        self._write_open()
+        self._write_form()
+        self._write_close()
+
+    @tornado.web.asynchronous
+    def post(self):
+        client.request(self.get_body_argument("message"), lambda x: self.on_response(x))
+
+    def on_response(self, body):
+        self.set_header("Content-Type", "text/html")
+        self._write_open()
+        self._write_form()
+        self.write("Response: " + body)
+        self._write_close()
+        self.finish()
+
+    def _write_open(self):
+        self.write('<html><body>')
+
+    def _write_close(self):
+        self.write('</body></html>')
+
+    def _write_form(self):
+        self.write('<form action="/client" method="POST">'
+                   'Request: <input type="text" name="message">'
+                   '<input type="submit" value="Submit">'
+                   '</form>')
+
+
+client = Client("localhost:5672", "examples")
+loop = TornadoLoop(client)
+app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))])
+app.listen(8888)
+try:
+    loop.run()
+except KeyboardInterrupt:
+    loop.stop()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/common.py b/examples/engine/py/common.py
new file mode 100644
index 0000000..d4d9a69
--- /dev/null
+++ b/examples/engine/py/common.py
@@ -0,0 +1,699 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import errno, os, random, select, time, traceback
+from proton import *
+from socket import *
+from threading import Thread
+from heapq import heappush, heappop, nsmallest
+
+class Selectable:
+
+    def __init__(self, transport, socket):
+        self.transport = transport
+        self.socket = socket
+        self.write_done = False
+        self.read_done = False
+
+    def closed(self):
+        if self.write_done and self.read_done:
+            self.socket.close()
+            return True
+        else:
+            return False
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def reading(self):
+        if self.read_done: return False
+        c = self.transport.capacity()
+        if c > 0:
+            return True
+        elif c < 0:
+            self.read_done = True
+            return False
+
+    def writing(self):
+        if self.write_done: return False
+        try:
+            p = self.transport.pending()
+            if p > 0:
+                return True
+            elif p < 0:
+                self.write_done = True
+                return False
+        except TransportException, e:
+            self.write_done = True
+            return False
+
+    def readable(self):
+        c = self.transport.capacity()
+        if c > 0:
+            try:
+                data = self.socket.recv(c)
+                if data:
+                    self.transport.push(data)
+                else:
+                    self.transport.close_tail()
+            except error, e:
+                print "read error", e
+                self.transport.close_tail()
+                self.read_done = True
+        elif c < 0:
+            self.read_done = True
+
+    def writable(self):
+        try:
+            p = self.transport.pending()
+            if p > 0:
+                data = self.transport.peek(p)
+                n = self.socket.send(data)
+                self.transport.pop(n)
+            elif p < 0:
+                self.write_done = True
+        except error, e:
+            print "write error", e
+            self.transport.close_head()
+            self.write_done = True
+
+    def tick(self, now):
+        return self.transport.tick(now)
+
+class Acceptor:
+
+    def __init__(self, driver, host, port):
+        self.driver = driver
+        self.socket = socket()
+        self.socket.setblocking(0)
+        self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+        self.socket.bind((host, port))
+        self.socket.listen(5)
+        self.driver.add(self)
+
+    def closed(self):
+        return False
+
+    def fileno(self):
+        return self.socket.fileno()
+
+    def reading(self):
+        return True
+
+    def writing(self):
+        return False
+
+    def readable(self):
+        sock, addr = self.socket.accept()
+        sock.setblocking(0)
+        print "Incoming Connection:", addr
+        if sock:
+            conn = Connection()
+            conn.collect(self.driver.collector)
+            transport = Transport()
+            transport.bind(conn)
+            sasl = transport.sasl()
+            sasl.mechanisms("ANONYMOUS")
+            sasl.server()
+            sasl.done(SASL.OK)
+            sel = Selectable(transport, sock)
+            self.driver.add(sel)
+
+    def tick(self, now):
+        return None
+
+class Interrupter:
+
+    def __init__(self):
+        self.read, self.write = os.pipe()
+
+    def fileno(self):
+        return self.read
+
+    def readable(self):
+        os.read(self.read, 1024)
+
+    def interrupt(self):
+        os.write(self.write, 'x')
+
+class PQueue:
+
+    def __init__(self):
+        self.entries = []
+
+    def add(self, task, priority):
+        heappush(self.entries, (priority, task))
+
+    def peek(self):
+        if self.entries:
+            return nsmallest(1, self.entries)[0]
+        else:
+            return None
+
+    def pop(self):
+        if self.entries:
+            return heappop(self.entries)
+        else:
+            return None
+
+    def __nonzero__(self):
+        if self.entries:
+            return True
+        else:
+            return False
+
+import proton
+
+TIMER_EVENT = proton.EventType(10000, "on_timer")
+
+class Timer:
+
+    def __init__(self, collector):
+        self.collector = collector
+        self.tasks = PQueue()
+
+    def schedule(self, task, deadline):
+        self.tasks.add(task, deadline)
+
+    def tick(self, now):
+        while self.tasks:
+            deadline, task = self.tasks.peek()
+            if now > deadline:
+                self.tasks.pop()
+                self.collector.put(task, TIMER_EVENT)
+            else:
+                return deadline
+
+def _dispatch(ev, handler):
+    if ev.clazz == "pn_delivery" and ev.context.released:
+        return
+    else:
+        ev.dispatch(handler)
+
+def _expand(handlers):
+    result = []
+    for h in handlers:
+        if hasattr(h, "handlers"):
+            result.extend(h.handlers)
+        else:
+            result.append(h)
+    return result
+
+class Driver(Handler):
+
+    def __init__(self, *handlers):
+        self.collector = Collector()
+        self.handlers = _expand(handlers)
+        self.interrupter = Interrupter()
+        self.timer = Timer(self.collector)
+        self.selectables = []
+        self.now = None
+        self.deadline = None
+        self._abort = False
+        self._exit = False
+        self._thread = Thread(target=self.run)
+        self._thread.setDaemon(True)
+
+    def schedule(self, task, timeout):
+        self.timer.schedule(task, self.now + timeout)
+
+    def abort(self):
+        self._abort = True
+
+    def exit(self):
+        self._exit = True
+
+    def wakeup(self):
+        self.interrupter.interrupt()
+
+    def start(self):
+        self._thread.start()
+
+    def join(self):
+        self._thread.join()
+
+    def _init_deadline(self):
+        self.now = time.time()
+        self.deadline = None
+
+    def _update_deadline(self, t):
+        if t is None or t < self.now: return
+        if self.deadline is None or t < self.deadline:
+            self.deadline = t
+
+    @property
+    def _timeout(self):
+        if self.deadline is None:
+            return None
+        else:
+            return self.deadline - self.now
+
+    def run(self):
+        self._init_deadline()
+        for h in self.handlers:
+            dispatch(h, "on_start", self)
+
+        while True:
+            self._init_deadline()
+
+            while True:
+                self.process_events()
+                if self._abort: return
+                self._update_deadline(self.timer.tick(self.now))
+                count = self.process_events()
+                if self._abort: return
+                if not count:
+                    break
+
+            reading = [self.interrupter]
+            writing = []
+
+            for s in self.selectables[:]:
+                if s.reading(): reading.append(s)
+                if s.writing(): writing.append(s)
+                self._update_deadline(s.tick(self.now))
+                if s.closed(): self.selectables.remove(s)
+
+            if self._exit and not self.selectables: return
+
+            try:
+                readable, writable, _ = select.select(reading, writing, [], self._timeout)
+            except select.error, (err, errtext):
+                if err == errno.EINTR:
+                    continue
+                else:
+                    raise
+
+            for s in readable:
+                s.readable()
+            for s in writable:
+                s.writable()
+
+    def process_events(self):
+        count = 0
+
+        quiesced = False
+        while True:
+            ev = self.collector.peek()
+            if ev:
+                count += 1
+                quiesced = False
+                _dispatch(ev, self)
+                for h in self.get_handlers(ev.context):
+                    _dispatch(ev, h)
+                self.collector.pop()
+            elif quiesced:
+                return count
+            else:
+                for h in self.handlers:
+                    dispatch(h, "on_quiesced", self)
+                quiesced = True
+
+        return count
+
+    getters = {
+        Transport: lambda x: x.connection,
+        Delivery: lambda x: x.link,
+        Sender: lambda x: x.session,
+        Receiver: lambda x: x.session,
+        Session: lambda x: x.connection,
+    }
+
+    def get_handlers(self, context):
+        if hasattr(context, "handlers"):
+            return context.handlers
+        elif context.__class__ in self.getters:
+            parent = self.getters[context.__class__](context)
+            return self.get_handlers(parent)
+        else:
+            return self.handlers
+
+    def on_connection_local_open(self, event):
+        conn = event.context
+        if conn.state & Endpoint.REMOTE_UNINIT:
+            self._connect(conn)
+
+    def _connect(self, conn):
+        transport = Transport()
+        transport.idle_timeout = 300
+        sasl = transport.sasl()
+        sasl.mechanisms("ANONYMOUS")
+        sasl.client()
+        transport.bind(conn)
+        sock = socket()
+        sock.setblocking(0)
+        hostport = conn.hostname.split(":", 1)
+        host = hostport[0]
+        if len(hostport) > 1:
+            port = int(hostport[1])
+        else:
+            port = 5672
+        sock.connect_ex((host, port))
+        selectable = Selectable(transport, sock)
+        self.add(selectable)
+
+    def on_timer(self, event):
+        event.context()
+
+    def connection(self, *handlers):
+        conn = Connection()
+        if handlers:
+            conn.handlers = _expand(handlers)
+        conn.collect(self.collector)
+        return conn
+
+    def acceptor(self, host, port):
+        return Acceptor(self, host, port)
+
+    def add(self, selectable):
+        self.selectables.append(selectable)
+
+class Handshaker(Handler):
+
+    def on_connection_remote_open(self, event):
+        conn = event.context
+        if conn.state & Endpoint.LOCAL_UNINIT:
+            conn.open()
+
+    def on_session_remote_open(self, event):
+        ssn = event.context
+        if ssn.state & Endpoint.LOCAL_UNINIT:
+            ssn.open()
+
+    def on_link_remote_open(self, event):
+        link = event.context
+        if link.state & Endpoint.LOCAL_UNINIT:
+            link.source.copy(link.remote_source)
+            link.target.copy(link.remote_target)
+            link.open()
+
+    def on_connection_remote_close(self, event):
+        conn = event.context
+        if not (conn.state & Endpoint.LOCAL_CLOSED):
+            conn.close()
+
+    def on_session_remote_close(self, event):
+        ssn = event.context
+        if not (ssn.state & Endpoint.LOCAL_CLOSED):
+            ssn.close()
+
+    def on_link_remote_close(self, event):
+        link = event.context
+        if not (link.state & Endpoint.LOCAL_CLOSED):
+            link.close()
+
+class FlowController(Handler):
+
+    def __init__(self, window):
+        self.window = window
+
+    def top_up(self, link):
+        delta = self.window - link.credit
+        link.flow(delta)
+
+    def on_link_local_open(self, event):
+        link = event.context
+        if link.is_receiver:
+            self.top_up(link)
+
+    def on_link_remote_open(self, event):
+        link = event.context
+        if link.is_receiver:
+            self.top_up(link)
+
+    def on_link_flow(self, event):
+        link = event.context
+        if link.is_receiver:
+            self.top_up(link)
+
+    def on_delivery(self, event):
+        delivery = event.context
+        if delivery.link.is_receiver:
+            self.top_up(delivery.link)
+
+class Row:
+
+    def __init__(self):
+        self.links = set()
+
+    def add(self, link):
+        self.links.add(link)
+
+    def discard(self, link):
+        self.links.discard(link)
+
+    def choose(self):
+        if self.links:
+            return random.choice(list(self.links))
+        else:
+            return None
+
+    def __iter__(self):
+        return iter(self.links)
+
+    def __nonzero__(self):
+        return bool(self.links)
+
+
+class Router(Handler):
+
+    EMPTY = Row()
+
+    def __init__(self):
+        self._outgoing = {}
+        self._incoming = {}
+
+    def incoming(self, address):
+        return self._incoming.get(address, self.EMPTY)
+
+    def outgoing(self, address):
+        return self._outgoing.get(address, self.EMPTY)
+
+    def address(self, link):
+        if link.is_sender:
+            return link.source.address or link.target.address
+        else:
+            return link.target.address
+
+    def table(self, link):
+        if link.is_sender:
+            return self._outgoing
+        else:
+            return self._incoming
+
+    def add(self, link):
+        address = self.address(link)
+        table = self.table(link)
+        row = table.get(address)
+        if row is None:
+            row = Row()
+            table[address] = row
+        row.add(link)
+
+    def remove(self, link):
+        address = self.address(link)
+        table = self.table(link)
+        row = table.get(address)
+        if row is not None:
+            row.discard(link)
+            if not row:
+                del table[address]
+
+    def on_link_local_open(self, event):
+        self.add(event.context)
+
+    def on_link_local_close(self, event):
+        self.remove(event.context)
+
+    def on_link_final(self, event):
+        self.remove(event.context)
+
+class Pool(Handler):
+
+    def __init__(self, collector, router=None):
+        self.collector = collector
+        self._connections = {}
+        if router:
+            self.outgoing_resolver = lambda address: router.outgoing(address).choose()
+            self.incoming_resolver = lambda address: router.incoming(address).choose()
+        else:
+            self.outgoing_resolver = lambda address: None
+            self.incoming_resolver = lambda address: None
+
+    def resolve(self, remote, local, resolver, constructor):
+        link = resolver(remote)
+        if link is None:
+            host = remote[2:].split("/", 1)[0]
+            conn = self._connections.get(host)
+            if conn is None:
+                conn = Connection()
+                conn.collect(self.collector)
+                conn.hostname = host
+                conn.open()
+                self._connections[host] = conn
+
+            ssn = conn.session()
+            ssn.open()
+            link = constructor(ssn, remote, local)
+            link.open()
+        return link
+
+    def on_transport_closed(self, event):
+        transport =  event.context
+        conn = transport.connection
+        del self._connections[conn.hostname]
+
+    def outgoing(self, target, source=None):
+        return self.resolve(target, source, self.outgoing_resolver, self.new_outgoing)
+
+    def incoming(self, source, target=None):
+        return self.resolve(source, target, self.incoming_resolver, self.new_incoming)
+
+    def new_outgoing(self, ssn, remote, local):
+        snd = ssn.sender("%s-%s" % (local, remote))
+        snd.source.address = local
+        snd.target.address = remote
+        return snd
+
+    def new_incoming(self, ssn, remote, local):
+        rcv = ssn.receiver("%s-%s" % (remote, local))
+        rcv.source.address = remote
+        rcv.target.address = local
+        return rcv
+
+class MessageDecoder(Handler):
+
+    def __init__(self, delegate):
+        self.__delegate = delegate
+
+    def on_start(self, drv):
+        try:
+            self.__delegate
+        except AttributeError:
+            self.__delegate = self
+        self.__message = Message()
+
+    def on_delivery(self, event):
+        dlv = event.context
+        if dlv.link.is_receiver and not dlv.partial:
+            encoded = dlv.link.recv(dlv.pending)
+            self.__message.decode(encoded)
+            try:
+                dispatch(self.__delegate, "on_message", dlv.link, self.__message)
+                dlv.update(Delivery.ACCEPTED)
+            except:
+                dlv.update(Delivery.REJECTED)
+                traceback.print_exc()
+            dlv.settle()
+
+class Address:
+
+    def __init__(self, st):
+        self.st = st
+
+    @property
+    def host(self):
+        return self.st[2:].split("/", 1)[0]
+
+    @property
+    def path(self):
+        parts = self.st[2:].split("/", 1)
+        if len(parts) == 2:
+            return parts[1]
+        else:
+            return ""
+
+    def __repr__(self):
+        return "Address(%r)" % self.st
+
+    def __str__(self):
+        return self.st
+
+class SendQueue(Handler):
+
+    def __init__(self, address):
+        self.address = Address(address)
+        self.messages = []
+        self.sent = 0
+
+    def on_start(self, drv):
+        self.driver = drv
+        self.connect()
+
+    def connect(self):
+        self.conn = self.driver.connection(self)
+        self.conn.hostname = self.address.host
+        ssn = self.conn.session()
+        snd = ssn.sender(str(self.address))
+        snd.target.address = str(self.address)
+        ssn.open()
+        snd.open()
+        self.conn.open()
+        self.link = snd
+
+    def put(self, message):
+        self.messages.append(message.encode())
+        if self.link:
+            self.pump(self.link)
+
+    def on_link_flow(self, event):
+        link = event.context
+        self.pump(link)
+
+    def pump(self, link):
+        while self.messages and link.credit > 0:
+            dlv = link.delivery(str(self.sent))
+            bytes = self.messages.pop(0)
+            link.send(bytes)
+            dlv.settle()
+            self.sent += 1
+
+    def on_transport_closed(self, event):
+        conn = event.context.connection
+        self.conn = None
+        self.link = None
+        self.driver.schedule(self.connect, 1)
+
+# XXX: terrible name for this
+class RecvQueue(Handler):
+
+    def __init__(self, address, delegate):
+        self.address = Address(address)
+        self.delegate = delegate
+        self.decoder = MessageDecoder(self.delegate)
+        self.handlers = [FlowController(1024), self.decoder, self]
+
+    def on_start(self, drv):
+        self.driver = drv
+        self.decoder.on_start(drv)
+        self.connect()
+
+    def connect(self):
+        self.conn = self.driver.connection(self)
+        self.conn.hostname = self.address.host
+        ssn = self.conn.session()
+        rcv = ssn.receiver(str(self.address))
+        rcv.source.address = str(self.address)
+        ssn.open()
+        rcv.open()
+        self.conn.open()
+
+    def on_transport_closed(self, event):
+        conn = event.context.connection
+        self.conn = None
+        self.driver.schedule(self.connect, 1)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_common.py b/examples/engine/py/db_common.py
new file mode 100644
index 0000000..584c15a
--- /dev/null
+++ b/examples/engine/py/db_common.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import sqlite3
+import threading
+
+class Db(object):
+    def __init__(self, db, events):
+        self.db = db
+        self.events = events
+        self.tasks = Queue.Queue()
+        self.position = None
+        self.pending_events = []
+        self.thread = threading.Thread(target=self._process)
+        self.thread.daemon=True
+        self.thread.start()
+
+    def reset(self):
+        self.tasks.put(lambda conn: self._reset())
+
+    def load(self, records, event=None):
+        self.tasks.put(lambda conn: self._load(conn, records, event))
+
+    def insert(self, id, data, event=None):
+        self.tasks.put(lambda conn: self._insert(conn, id, data, event))
+
+    def delete(self, id, event=None):
+        self.tasks.put(lambda conn: self._delete(conn, id, event))
+
+    def _reset(self, ignored=None):
+        self.position = None
+
+    def _load(self, conn, records, event):
+        if self.position:
+            cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
+        else:
+            cursor = conn.execute("SELECT * FROM records ORDER BY id")
+        while not records.full():
+            row = cursor.fetchone()
+            if row:
+                self.position = row['id']
+                records.put(dict(row))
+            else:
+                break
+        if event:
+            self.events.trigger(event)
+
+    def _insert(self, conn, id, data, event):
+        if id:
+            conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
+        else:
+            conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
+        if event:
+            self.pending_events.append(event)
+
+    def _delete(self, conn, id, event):
+        conn.execute("DELETE FROM records WHERE id=?", (id,))
+        if event:
+            self.pending_events.append(event)
+
+    def _process(self):
+        conn = sqlite3.connect(self.db)
+        conn.row_factory = sqlite3.Row
+        with conn:
+            while True:
+                f = self.tasks.get(True)
+                try:
+                    while True:
+                        f(conn)
+                        f = self.tasks.get(False)
+                except Queue.Empty: pass
+                conn.commit()
+                for event in self.pending_events:
+                    self.events.trigger(event)
+                self.pending_events = []

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_ctrl.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_ctrl.py b/examples/engine/py/db_ctrl.py
new file mode 100755
index 0000000..b28e0eb
--- /dev/null
+++ b/examples/engine/py/db_ctrl.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlite3
+import sys
+
+if len(sys.argv) < 3:
+    print "Usage: %s [init|insert|list] db" % sys.argv[0]
+else:
+    conn = sqlite3.connect(sys.argv[2])
+    with conn:
+        if sys.argv[1] == "init":
+            conn.execute("DROP TABLE IF EXISTS records")
+            conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)")
+            conn.commit()
+        elif sys.argv[1] == "list":
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM records")
+            rows = cursor.fetchall()
+            for r in rows:
+                print r
+        elif sys.argv[1] == "insert":
+            while True:
+                l = sys.stdin.readline()
+                if not l: break
+                conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),))
+            conn.commit()
+        else:
+            print "Unrecognised command: %s" %  sys.argv[1]

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py
new file mode 100755
index 0000000..8b4490d
--- /dev/null
+++ b/examples/engine/py/db_recv.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactors import ApplicationEvent, Container
+from db_common import Db
+
+class Recv(MessagingHandler):
+    def __init__(self, url):
+        super(Recv, self).__init__(auto_accept=False)
+        self.url = url
+        self.delay = 0
+        # TODO: load last tag from db
+        self.last_id = None
+
+    def on_start(self, event):
+        self.db = Db("dst_db", event.container.get_event_trigger())
+        event.container.create_receiver(self.url)
+
+    def on_record_inserted(self, event):
+        self.accept(event.delivery)
+
+    def on_message(self, event):
+        id = int(event.message.id)
+        if (not self.last_id) or id > self.last_id:
+            self.last_id = id
+            self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+            print "inserted message %s" % id
+        else:
+            self.accept(event.delivery)
+
+try:
+    Container(Recv("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
new file mode 100755
index 0000000..ce3ce79
--- /dev/null
+++ b/examples/engine/py/db_send.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import time
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import ApplicationEvent, Container
+from db_common import Db
+
+class Send(MessagingHandler):
+    def __init__(self, url):
+        super(Send, self).__init__()
+        self.url = url
+        self.delay = 0
+        self.sent = 0
+        self.load_count = 0
+        self.records = Queue.Queue(maxsize=50)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.db = Db("src_db", self.container.get_event_trigger())
+        self.sender = self.container.create_sender(self.url)
+
+    def on_records_loaded(self, event):
+        if self.records.empty():
+            if event.subject == self.load_count:
+                print "Exhausted available data, waiting to recheck..."
+                # check for new data after 5 seconds
+                self.container.schedule(time.time() + 5, link=self.sender, subject="data")
+        else:
+            self.send()
+
+    def request_records(self):
+        if not self.records.full():
+            print "loading records..."
+            self.load_count += 1
+            self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count))
+
+    def on_credit(self, event):
+        self.send()
+
+    def send(self):
+        while self.sender.credit and not self.records.empty():
+            record = self.records.get(False)
+            id = record['id']
+            self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+            self.sent += 1
+            print "sent message %s" % id
+        self.request_records()
+
+    def on_settled(self, event):
+        id = int(event.delivery.tag)
+        self.db.delete(id)
+        print "settled message %s" % id
+
+    def on_disconnected(self, event):
+        self.db.reset()
+
+    def on_timer(self, event):
+        if event.subject == "data":
+            print "Rechecking for data..."
+            self.request_records()
+
+try:
+    Container(Send("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
new file mode 100755
index 0000000..92d6083
--- /dev/null
+++ b/examples/engine/py/helloworld.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
+        self.address = address
+
+    def on_start(self, event):
+        conn = event.container.connect(self.server)
+        event.container.create_receiver(conn, self.address)
+        event.container.create_sender(conn, self.address)
+
+    def on_credit(self, event):
+        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.close()
+
+    def on_message(self, event):
+        print event.message.body
+        event.connection.close()
+
+Container(HelloWorld("localhost:5672", "examples")).run()
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_blocking.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_blocking.py b/examples/engine/py/helloworld_blocking.py
new file mode 100755
index 0000000..9c5e062
--- /dev/null
+++ b/examples/engine/py/helloworld_blocking.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.utils import BlockingConnection
+from proton.handlers import IncomingMessageHandler
+
+class HelloWorldReceiver(IncomingMessageHandler):
+    def on_message(self, event):
+        print event.message.body
+        event.connection.close()
+
+conn = BlockingConnection("localhost:5672")
+conn.create_receiver("examples", handler=HelloWorldReceiver())
+sender = conn.create_sender("examples")
+sender.send_msg(Message(body=u"Hello World!"));
+conn.run()
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py
new file mode 100755
index 0000000..c961fe5
--- /dev/null
+++ b/examples/engine/py/helloworld_direct.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class HelloWorld(MessagingHandler):
+    def __init__(self, url):
+        super(HelloWorld, self).__init__()
+        self.url = url
+
+    def on_start(self, event):
+        self.acceptor = event.container.listen(self.url)
+        event.container.create_sender(self.url)
+
+    def on_credit(self, event):
+        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.close()
+
+    def on_message(self, event):
+        print event.message.body
+
+    def on_accepted(self, event):
+        event.connection.close()
+
+    def on_connection_closed(self, event):
+        self.acceptor.close()
+
+Container(HelloWorld("localhost:8888/examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
new file mode 100755
index 0000000..8873357
--- /dev/null
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
+        self.address = address
+
+    def on_start(self, event):
+        self.eventloop = event.container
+        self.acceptor = event.container.listen(self.server)
+        conn = event.container.connect(self.server)
+        event.container.create_sender(conn, self.address)
+
+    def on_credit(self, event):
+        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.close()
+
+    def on_message(self, event):
+        print event.message.body
+
+    def on_accepted(self, event):
+        event.connection.close()
+
+    def on_connection_closed(self, event):
+        self.acceptor.close()
+        self.eventloop.stop()
+
+TornadoLoop(HelloWorld("localhost:8888", "examples")).run()
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
new file mode 100755
index 0000000..f7d4c26
--- /dev/null
+++ b/examples/engine/py/helloworld_tornado.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
+        self.address = address
+
+    def on_start(self, event):
+        self.eventloop = event.container
+        conn = event.container.connect(self.server)
+        event.container.create_receiver(conn, self.address)
+        event.container.create_sender(conn, self.address)
+
+    def on_credit(self, event):
+        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.close()
+
+    def on_message(self, event):
+        print event.message.body
+        event.connection.close()
+
+    def on_connection_closed(self, event):
+        self.eventloop.stop()
+
+TornadoLoop(HelloWorld("localhost:5672", "examples")).run()
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_server.py b/examples/engine/py/proton_server.py
new file mode 100644
index 0000000..8a5077b
--- /dev/null
+++ b/examples/engine/py/proton_server.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import MessagingHandler
+
+class Server(MessagingHandler):
+    def __init__(self, host, address):
+        super(Server, self).__init__()
+        self.container = Container(self)
+        self.conn = self.container.connect(host)
+        self.receiver = self.container.create_receiver(self.conn, address)
+        self.senders = {}
+        self.relay = None
+
+    def on_message(self, event):
+        self.on_request(event.message.body, event.message.reply_to)
+
+    def on_connection_open(self, event):
+        if event.connection.remote_offered_capabilities and "ANONYMOUS-RELAY" in event.connection.remote_offered_capabilities:
+            self.relay = self.container.create_sender(self.conn, None)
+
+    def on_connection_close(self, endpoint, error):
+        if error: print "Closed due to %s" % error
+        self.conn.close()
+
+    def run(self):
+        self.container.run()
+
+    def send(self, response, reply_to):
+        sender = self.relay
+        if not sender:
+            sender = self.senders.get(reply_to)
+        if not sender:
+            sender = self.container.create_sender(self.conn, reply_to)
+            self.senders[reply_to] = sender
+        msg = Message(body=response)
+        if self.relay:
+            msg.address = reply_to
+        sender.send_msg(msg)
+
+    def on_request(self, request, reply_to):
+        pass
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py
new file mode 100644
index 0000000..cfe7d6f
--- /dev/null
+++ b/examples/engine/py/proton_tornado.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import ApplicationEvent, Container, StartEvent
+import tornado.ioloop
+
+class TornadoLoop(Container):
+    def __init__(self, *handlers):
+        super(TornadoLoop, self).__init__(*handlers)
+        self.loop = tornado.ioloop.IOLoop.current()
+
+    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
+        conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect)
+        self.events.process()
+        return conn
+
+    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+    def add(self, conn):
+        self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+
+    def remove(self, conn):
+        self.loop.remove_handler(conn)
+
+    def run(self):
+        self.events.dispatch(StartEvent(self))
+        self.loop.start()
+
+    def stop(self):
+        self.loop.stop()
+
+    def _get_event_flags(self, conn):
+        flags = 0
+        if conn.reading():
+            flags |= tornado.ioloop.IOLoop.READ
+        # FIXME: need way to update flags to avoid busy loop
+        #if conn.writing():
+        #    flags |= tornado.ioloop.IOLoop.WRITE
+        flags |= tornado.ioloop.IOLoop.WRITE
+        return flags
+
+    def _connection_ready(self, conn, events):
+        if events & tornado.ioloop.IOLoop.READ:
+            conn.readable()
+        if events & tornado.ioloop.IOLoop.WRITE:
+            conn.writable()
+        if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed():
+            self.loop.remove_handler(conn)
+            conn.close()
+            conn.removed()
+        self.events.process()
+        self.loop.update_handler(conn, self._get_event_flags(conn))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py
new file mode 100755
index 0000000..de530d3
--- /dev/null
+++ b/examples/engine/py/recurring_timer.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton.reactors import Container, Handler
+
+class Recurring(Handler):
+    def __init__(self, period):
+        self.period = period
+
+    def on_start(self, event):
+        self.container = event.container
+        self.container.schedule(time.time() + self.period, subject=self)
+
+    def on_timer(self, event):
+        print "Tick..."
+        self.container.schedule(time.time() + self.period, subject=self)
+
+try:
+    container = Container(Recurring(1.0))
+    container.run()
+except KeyboardInterrupt:
+    container.stop()
+    print
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer_tornado.py b/examples/engine/py/recurring_timer_tornado.py
new file mode 100755
index 0000000..aeeb20c
--- /dev/null
+++ b/examples/engine/py/recurring_timer_tornado.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton.reactors import Handler
+from proton_tornado import TornadoLoop
+
+class Recurring(Handler):
+    def __init__(self, period):
+        self.period = period
+
+    def on_start(self, event):
+        self.container = event.container
+        self.container.schedule(time.time() + self.period, subject=self)
+
+    def on_timer(self, event):
+        print "Tick..."
+        self.container.schedule(time.time() + self.period, subject=self)
+
+try:
+    container = TornadoLoop(Recurring(1.0))
+    container.run()
+except KeyboardInterrupt:
+    container.stop()
+    print
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/selected_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/selected_recv.py b/examples/engine/py/selected_recv.py
new file mode 100755
index 0000000..d0df3b5
--- /dev/null
+++ b/examples/engine/py/selected_recv.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import Container, Selector
+from proton.handlers import MessagingHandler
+
+class Recv(MessagingHandler):
+    def __init__(self):
+        super(Recv, self).__init__()
+
+    def on_start(self, event):
+        conn = event.container.connect("localhost:5672")
+        event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'"))
+
+    def on_message(self, event):
+        print event.message.body
+
+try:
+    Container(Recv()).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py
new file mode 100755
index 0000000..3e6aad4
--- /dev/null
+++ b/examples/engine/py/server.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Server(MessagingHandler):
+    def __init__(self, host, address):
+        super(Server, self).__init__()
+        self.host = host
+        self.address = address
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
+        self.senders = {}
+        self.relay = None
+
+    def on_connection_opened(self, event):
+        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+            self.relay = self.container.create_sender(self.conn, None)
+
+    def on_message(self, event):
+        sender = self.relay
+        if not sender:
+            sender = self.senders.get(event.message.reply_to)
+        if not sender:
+            sender = self.container.create_sender(self.conn, event.message.reply_to)
+            self.senders[event.message.reply_to] = sender
+        sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
+
+try:
+    Container(Server("localhost:5672", "examples")).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server_tx.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py
new file mode 100755
index 0000000..0305a3f
--- /dev/null
+++ b/examples/engine/py/server_tx.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import MessagingHandler, TransactionHandler
+
+class TxRequest(TransactionHandler):
+    def __init__(self, response, sender, request_delivery):
+        super(TxRequest, self).__init__()
+        self.response = response
+        self.sender = sender
+        self.request_delivery = request_delivery
+
+    def on_transaction_declared(self, event):
+        self.sender.send_msg(self.response, transaction=event.transaction)
+        self.accept(self.request_delivery, transaction=event.transaction)
+        event.transaction.commit()
+
+    def on_transaction_committed(self, event):
+        print "Request processed successfully"
+
+    def on_transaction_aborted(self, event):
+        print "Request processing aborted"
+
+
+class TxServer(MessagingHandler):
+    def __init__(self, host, address):
+        super(TxServer, self).__init__(auto_accept=False)
+        self.host = host
+        self.address = address
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = event.container.connect(self.host, reconnect=False)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
+        self.senders = {}
+        self.relay = None
+
+    def on_message(self, event):
+        sender = self.relay
+        if not sender:
+            sender = self.senders.get(event.message.reply_to)
+        if not sender:
+            sender = self.container.create_sender(self.conn, event.message.reply_to)
+            self.senders[event.message.reply_to] = sender
+
+        response = Message(address=event.message.reply_to, body=event.message.body.upper())
+        self.container.declare_transaction(self.conn, handler=TxRequest(response, sender, event.delivery))
+
+    def on_connection_open(self, event):
+        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+            self.relay = self.container.create_sender(self.conn, None)
+
+try:
+    Container(TxServer("localhost:5672", "examples")).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py
new file mode 100755
index 0000000..6825c86
--- /dev/null
+++ b/examples/engine/py/simple_recv.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Recv(MessagingHandler):
+    def __init__(self, url):
+        super(Recv, self).__init__()
+        self.url = url
+
+    def on_start(self, event):
+        event.container.create_receiver(self.url)
+
+    def on_message(self, event):
+        print event.message.body
+
+try:
+    Container(Recv("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
new file mode 100755
index 0000000..21530ef
--- /dev/null
+++ b/examples/engine/py/simple_send.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Send(MessagingHandler):
+    def __init__(self, url, messages):
+        super(Send, self).__init__()
+        self.url = url
+        self.sent = 0
+        self.confirmed = 0
+        self.total = messages
+
+    def on_start(self, event):
+        event.container.create_sender(self.url)
+
+    def on_credit(self, event):
+        while event.sender.credit and self.sent < self.total:
+            msg = Message(body={'sequence':(self.sent+1)})
+            event.sender.send_msg(msg)
+            self.sent += 1
+
+    def on_accepted(self, event):
+        self.confirmed += 1
+        if self.confirmed == self.total:
+            print "all messages confirmed"
+            event.connection.close()
+
+    def on_disconnected(self, event):
+        self.sent = self.confirmed
+
+try:
+    Container(Send("localhost:5672/examples", 10000)).run()
+except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/sync_client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/sync_client.py b/examples/engine/py/sync_client.py
new file mode 100755
index 0000000..362385a
--- /dev/null
+++ b/examples/engine/py/sync_client.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Demonstrates the client side of the synchronous request-response pattern
+(also known as RPC or Remote Procecure Call) using proton.
+
+"""
+
+from proton import Message, Url, ConnectionException, Timeout
+from proton.utils import BlockingConnection
+from proton.handlers import IncomingMessageHandler
+import sys
+
+class SyncRequestClient(IncomingMessageHandler):
+    """
+    Implementation of the synchronous request-responce (aka RPC) pattern.
+    Create an instance and call invoke() to send a request and wait for a response.
+    """
+
+    def __init__(self, url, timeout=None):
+        """
+        @param url: a proton.Url or a URL string of the form 'host:port/path'
+            host:port is used to connect, path is used to identify the remote messaging endpoint.
+        """
+        super(SyncRequestClient, self).__init__()
+        self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout)
+        self.sender = self.connection.create_sender(url.path)
+        # dynamic=true generates a unique address dynamically for this receiver.
+        # credit=1 because we want to receive 1 response message initially.
+        self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
+        self.response = None
+
+    def invoke(self, request):
+        """Send a request, wait for and return the response"""
+        request.reply_to = self.reply_to
+        self.sender.send_msg(request)
+        self.connection.wait(lambda: self.response, msg="Waiting for response")
+        response = self.response
+        self.response = None    # Ready for next response.
+        self.receiver.flow(1)   # Set up credit for the next response.
+        return response
+
+    @property
+    def reply_to(self):
+        """Return the dynamic address of our receiver."""
+        return self.receiver.remote_source.address
+
+    def on_message(self, event):
+        """Called when we receive a message for our receiver."""
+        self.response = event.message # Store the response
+
+    def close(self):
+        self.connection.close()
+
+
+if __name__ == '__main__':
+    url = Url("0.0.0.0/examples")
+    if len(sys.argv) > 1: url = Url(sys.argv[1])
+
+    invoker = SyncRequestClient(url, timeout=2)
+    try:
+        REQUESTS= ["Twas brillig, and the slithy toves",
+                   "Did gire and gymble in the wabe.",
+                   "All mimsy were the borogroves,",
+                   "And the mome raths outgrabe."]
+        for request in REQUESTS:
+            response = invoker.invoke(Message(body=request))
+            print "%s => %s" % (request, response.body)
+    finally:
+        invoker.close()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py
new file mode 100755
index 0000000..fc4bb8a
--- /dev/null
+++ b/examples/engine/py/tx_recv.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+    def __init__(self, batch_size):
+        super(TxRecv, self).__init__(prefetch=0)
+        self.current_batch = 0
+        self.batch_size = batch_size
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672")
+        self.receiver = self.container.create_receiver(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
+        self.transaction = None
+
+    def on_message(self, event):
+        print event.message.body
+        self.accept(event.delivery, self.transaction)
+        self.current_batch += 1
+        if self.current_batch == self.batch_size:
+            self.transaction.commit()
+            self.transaction = None
+
+    def on_transaction_declared(self, event):
+        self.receiver.flow(self.batch_size)
+        self.transaction = event.transaction
+
+    def on_transaction_committed(self, event):
+        self.current_batch = 0
+        self.container.declare_transaction(self.conn, handler=self)
+
+    def on_disconnected(self, event):
+        self.current_batch = 0
+
+try:
+    Container(TxRecv(10)).run()
+except KeyboardInterrupt: pass
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py
new file mode 100755
index 0000000..6eb320e
--- /dev/null
+++ b/examples/engine/py/tx_recv_interactive.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import threading
+from proton.reactors import ApplicationEvent, Container
+from proton.handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+    def __init__(self):
+        super(TxRecv, self).__init__(prefetch=0)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672")
+        self.receiver = self.container.create_receiver(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True)
+        self.transaction = None
+
+    def on_message(self, event):
+        print event.message.body
+        self.transaction.accept(event.delivery)
+
+    def on_transaction_declared(self, event):
+        self.transaction = event.transaction
+        print "transaction declared"
+
+    def on_transaction_committed(self, event):
+        print "transaction committed"
+        self.container.declare_transaction(self.conn, handler=self)
+
+    def on_transaction_aborted(self, event):
+        print "transaction aborted"
+        self.container.declare_transaction(self.conn, handler=self)
+
+    def on_commit(self, event):
+        self.transaction.commit()
+
+    def on_abort(self, event):
+        self.transaction.abort()
+
+    def on_fetch(self, event):
+        self.receiver.flow(1)
+
+    def on_quit(self, event):
+        c = self.receiver.connection
+        self.receiver.close()
+        c.close()
+
+try:
+    reactor = Container(TxRecv())
+    events = reactor.get_event_trigger()
+    thread = threading.Thread(target=reactor.run)
+    thread.daemon=True
+    thread.start()
+
+    print "Enter 'fetch', 'commit' or 'abort'"
+    while True:
+        line = sys.stdin.readline()
+        if line:
+            events.trigger(ApplicationEvent(line.strip()))
+        else:
+            break
+except KeyboardInterrupt: pass
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org