You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/12/12 12:53:36 UTC
[1/2] qpid-proton git commit: Initial set of engine examples along
with supporting additions to the library.
Repository: qpid-proton
Updated Branches:
refs/heads/master 9a72a30cd -> 34e64e324
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
new file mode 100755
index 0000000..5b11280
--- /dev/null
+++ b/examples/engine/py/tx_send.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxSend(TransactionalClientHandler):
+ def __init__(self, messages, batch_size):
+ super(TxSend, self).__init__()
+ self.current_batch = 0
+ self.committed = 0
+ self.confirmed = 0
+ self.total = messages
+ self.batch_size = batch_size
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = self.container.connect("localhost:5672", handler=self)
+ self.sender = self.container.create_sender(self.conn, "examples")
+ self.container.declare_transaction(self.conn, handler=self)
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ self.send()
+
+ def on_credit(self, event):
+ self.send()
+
+ def send(self):
+ while self.transaction and self.sender.credit and self.committed < self.total:
+ msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
+ self.sender.send_msg(msg, transaction=self.transaction)
+ self.current_batch += 1
+ if self.current_batch == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+
+ def on_accepted(self, event):
+ if event.sender == self.sender:
+ self.confirmed += 1
+
+ def on_transaction_committed(self, event):
+ self.committed += self.current_batch
+ if self.committed == self.total:
+ print "all messages committed"
+ event.connection.close()
+ else:
+ self.current_batch = 0
+ self.container.declare_transaction(self.conn, handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+try:
+ Container(TxSend(10000, 10)).run()
+except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py
new file mode 100755
index 0000000..c051408
--- /dev/null
+++ b/examples/engine/py/tx_send_sync.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxSend(TransactionalClientHandler):
+ def __init__(self, messages, batch_size):
+ super(TxSend, self).__init__()
+ self.current_batch = 0
+ self.committed = 0
+ self.confirmed = 0
+ self.total = messages
+ self.batch_size = batch_size
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = self.container.connect("localhost:5672", handler=self)
+ self.sender = self.container.create_sender(self.conn, "examples")
+ self.container.declare_transaction(self.conn, handler=self)
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ self.send()
+
+ def on_credit(self, event):
+ self.send()
+
+ def send(self):
+ while self.transaction and self.current_batch < self.batch_size and self.sender.credit and self.committed < self.total:
+ msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
+ self.sender.send_msg(msg, transaction=self.transaction)
+ self.current_batch += 1
+
+ def on_accepted(self, event):
+ if event.sender == self.sender:
+ self.confirmed += 1
+ if self.confirmed == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+ self.confirmed = 0
+
+ def on_transaction_committed(self, event):
+ self.committed += self.current_batch
+ if self.committed == self.total:
+ print "all messages committed"
+ event.connection.close()
+ else:
+ self.current_batch = 0
+ self.container.declare_transaction(self.conn, handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+try:
+ Container(TxSend(10000, 10)).run()
+except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/CMakeLists.txt b/proton-c/bindings/python/CMakeLists.txt
index 6be421e..ee660b6 100644
--- a/proton-c/bindings/python/CMakeLists.txt
+++ b/proton-c/bindings/python/CMakeLists.txt
@@ -53,22 +53,27 @@ if (NOT PYTHON_SITEARCH_PACKAGES)
endif()
set (pysrc-generated cproton.py)
-set (pysrc proton/__init__.py)
+set (pysrc
+ proton/__init__.py
+ proton/handlers.py
+ proton/reactors.py
+ proton/utils.py
+ )
-macro (py_compile directory files)
+macro (py_compile directory files artifacts)
foreach (src_file ${files})
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile ${src_file}
WORKING_DIRECTORY ${directory})")
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile ${src_file}
WORKING_DIRECTORY ${directory})")
- list(APPEND PYTHON_ARTIFACTS ${directory}/${src_file}
+ list(APPEND ${artifacts} ${directory}/${src_file}
${directory}/${src_file}c
${directory}/${src_file}o)
endforeach (src_file)
endmacro(py_compile)
-py_compile(${CMAKE_CURRENT_BINARY_DIR} ${pysrc-generated})
-py_compile(${CMAKE_CURRENT_SOURCE_DIR} ${pysrc})
+py_compile(${CMAKE_CURRENT_BINARY_DIR} ${pysrc-generated} CPROTON_ARTIFACTS)
+py_compile(${CMAKE_CURRENT_SOURCE_DIR} "${pysrc}" PROTON_ARTIFACTS)
find_program(EPYDOC_EXE epydoc)
mark_as_advanced (EPYDOC_EXE)
@@ -84,9 +89,12 @@ if (EPYDOC_EXE)
${OPTIONAL_ARG})
endif (EPYDOC_EXE)
-install(FILES ${PYTHON_ARTIFACTS}
+install(FILES ${CPROTON_ARTIFACTS}
DESTINATION ${PYTHON_SITEARCH_PACKAGES}
COMPONENT Python)
+install(FILES ${PROTON_ARTIFACTS}
+ DESTINATION "${PYTHON_SITEARCH_PACKAGES}/proton/"
+ COMPONENT Python)
install(TARGETS ${SWIG_MODULE_cproton_REAL_NAME}
DESTINATION ${PYTHON_SITEARCH_PACKAGES}
COMPONENT Python)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 0fb7270..a4ec1c6 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -85,6 +85,9 @@ except ImportError:
def uuid4():
return uuid.UUID(bytes=random_uuid())
+def generate_uuid():
+ return uuid.uuid4()
+
try:
bytes()
except NameError:
@@ -2965,7 +2968,24 @@ class Delivery(object):
@property
def link(self):
- return Link._wrap_link(pn_delivery_link(self._dlv))
+ if not self.released:
+ return Link._wrap_link(pn_delivery_link(self._dlv))
+ else:
+ return None
+
+ @property
+ def session(self):
+ if self.link:
+ return self.link.session
+ else:
+ return None
+
+ @property
+ def connection(self):
+ if self.session:
+ return self.session.connection
+ else:
+ return None
class TransportException(ProtonException):
pass
@@ -3384,7 +3404,10 @@ class Collector:
clazz = pn_class_name(pn_event_class(event))
context = wrappers[clazz](pn_event_context(event))
- return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
+ if isinstance(context, EventBase):
+ return context
+ else:
+ return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
def pop(self):
ev = self.peek()
@@ -3396,7 +3419,7 @@ class Collector:
pn_collector_free(self._impl)
del self._impl
-class EventType:
+class EventType(object):
TYPES = {}
@@ -3416,7 +3439,20 @@ def dispatch(handler, method, *args):
elif hasattr(handler, "on_unhandled"):
return handler.on_unhandled(method, args)
-class Event:
+class EventBase(object):
+
+ def __init__(self, clazz, context, type):
+ self.clazz = clazz
+ self.context = context
+ self.type = type
+
+ def _popped(self, collector):
+ pass
+
+ def dispatch(self, handler):
+ return dispatch(handler, self.type.method, self)
+
+class Event(EventBase):
CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init")
CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound")
@@ -3453,28 +3489,19 @@ class Event:
TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed")
def __init__(self, clazz, context, type):
- self.clazz = clazz
- self.context = context
- self.type = type
+ super(Event, self).__init__(clazz, context, type)
def _popped(self, collector):
if self.type in (Event.LINK_FINAL, Event.SESSION_FINAL,
Event.CONNECTION_FINAL):
collector._contexts.remove(self.context)
- def dispatch(self, handler):
- return dispatch(handler, self.type.method, self)
-
@property
def connection(self):
if self.clazz == "pn_connection":
return self.context
- elif self.clazz == "pn_session":
- return self.context.connection
- elif self.clazz == "pn_link":
+ elif self.clazz in ["pn_transport", "pn_session", "pn_link", "pn_delivery"]:
return self.context.connection
- elif self.clazz == "pn_delivery" and not self.context.released:
- return self.context.link.connection
else:
return None
@@ -3482,10 +3509,8 @@ class Event:
def session(self):
if self.clazz == "pn_session":
return self.context
- elif self.clazz == "pn_link":
+ elif self.clazz in ["pn_link", "pn_delivery"]:
return self.context.session
- elif self.clazz == "pn_delivery" and not self.context.released:
- return self.context.link.session
else:
return None
@@ -3493,7 +3518,7 @@ class Event:
def link(self):
if self.clazz == "pn_link":
return self.context
- elif self.clazz == "pn_delivery" and not self.context.released:
+ elif self.clazz in ["pn_delivery"]:
return self.context.link
else:
return None
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
new file mode 100644
index 0000000..71ab837
--- /dev/null
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -0,0 +1,440 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import heapq, os, Queue, re, socket, time, types
+from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
+from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
+from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+from select import select
+
+class FlowController(Handler):
+ """
+ A handler that controls a configured credit window for associated
+ receivers.
+ """
+ def __init__(self, window=1):
+ self.window = window
+
+ def top_up(self, link):
+ delta = self.window - link.credit
+ link.flow(delta)
+
+ def on_link_local_open(self, event):
+ if event.link.is_receiver:
+ self.top_up(event.link)
+
+ def on_link_remote_open(self, event):
+ if event.link.is_receiver:
+ self.top_up(event.link)
+
+ def on_link_flow(self, event):
+ if event.link.is_receiver:
+ self.top_up(event.link)
+
+ def on_delivery(self, event):
+ if not event.delivery.released and event.delivery.link.is_receiver:
+ self.top_up(event.delivery.link)
+
+def nested_handlers(handlers):
+ # currently only allows for a single level of nesting
+ nested = []
+ for h in handlers:
+ nested.append(h)
+ if hasattr(h, 'handlers'):
+ nested.extend(getattr(h, 'handlers'))
+ return nested
+
+def add_nested_handler(handler, nested):
+ if hasattr(handler, 'handlers'):
+ getattr(handler, 'handlers').append(nested)
+ else:
+ handler.handlers = [nested]
+
+class ScopedHandler(Handler):
+ """
+ An internal handler that checks for handlers scoped to the engine
+ objects an event relates to. E.g it allows delivery, link, session
+ or connection scoped handlers that will only be called with events
+ for the object to which they are scoped.
+ """
+ scopes = ["delivery", "link", "session", "connection"]
+
+ def on_unhandled(self, method, args):
+ event = args[0]
+ if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
+ return
+
+ objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
+ targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
+ handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
+ for h in handlers:
+ h(event)
+
+
+class OutgoingMessageHandler(Handler):
+ """
+ A utility for simpler and more intuitive handling of delivery
+ events related to outgoing i.e. sent messages.
+ """
+ def __init__(self, auto_settle=True, delegate=None):
+ self.auto_settle = auto_settle
+ self.delegate = delegate
+
+ def on_link_flow(self, event):
+ if event.link.is_sender and event.link.credit:
+ self.on_credit(event)
+
+ def on_delivery(self, event):
+ dlv = event.delivery
+ if dlv.released: return
+ if dlv.link.is_sender and dlv.updated:
+ if dlv.remote_state == Delivery.ACCEPTED:
+ self.on_accepted(event)
+ elif dlv.remote_state == Delivery.REJECTED:
+ self.on_rejected(event)
+ elif dlv.remote_state == Delivery.RELEASED:
+ self.on_released(event)
+ elif dlv.remote_state == Delivery.MODIFIED:
+ self.on_modified(event)
+ if dlv.settled:
+ self.on_settled(event)
+ if self.auto_settle:
+ dlv.settle()
+
+ def on_credit(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_credit', event)
+
+ def on_accepted(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_accepted', event)
+
+ def on_rejected(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_rejected', event)
+
+ def on_released(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_released', event)
+
+ def on_modified(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_modified', event)
+
+ def on_settled(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_settled', event)
+
+def recv_msg(delivery):
+ msg = Message()
+ msg.decode(delivery.link.recv(delivery.pending))
+ delivery.link.advance()
+ return msg
+
+class Reject(ProtonException):
+ """
+ An exception that indicate a message should be rejected
+ """
+ pass
+
+class Acking(object):
+ def accept(self, delivery):
+ self.settle(delivery, Delivery.ACCEPTED)
+
+ def reject(self, delivery):
+ self.settle(delivery, Delivery.REJECTED)
+
+ def release(self, delivery, delivered=True):
+ if delivered:
+ self.settle(delivery, Delivery.MODIFIED)
+ else:
+ self.settle(delivery, Delivery.RELEASED)
+
+ def settle(self, delivery, state=None):
+ if state:
+ delivery.update(state)
+ delivery.settle()
+
+class IncomingMessageHandler(Handler, Acking):
+ """
+ A utility for simpler and more intuitive handling of delivery
+ events related to incoming i.e. received messages.
+ """
+
+ def __init__(self, auto_accept=True, delegate=None):
+ self.delegate = delegate
+ self.auto_accept = auto_accept
+
+ def on_delivery(self, event):
+ dlv = event.delivery
+ if dlv.released or not dlv.link.is_receiver: return
+ if dlv.readable and not dlv.partial:
+ event.message = recv_msg(dlv)
+ try:
+ self.on_message(event)
+ if self.auto_accept:
+ dlv.update(Delivery.ACCEPTED)
+ dlv.settle()
+ except Reject:
+ dlv.update(Delivery.REJECTED)
+ dlv.settle()
+ elif dlv.updated and dlv.settled:
+ self.on_settled(event)
+
+ def on_message(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_message', event)
+
+ def on_settled(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_settled', event)
+
+class EndpointStateHandler(Handler):
+ """
+ A utility that exposes 'endpoint' events i.e. the open/close for
+ links, sessions and connections in a more intuitive manner. A
+ XXX_opened method will be called when both local and remote peers
+ have opened the link, session or connection. This can be used to
+ confirm a locally initiated action for example. A XXX_opening
+ method will be called when the remote peer has requested an open
+ that was not initiated locally. By default this will simply open
+ locally, which then triggers the XXX_opened call. The same applies
+ to close.
+ """
+
+ def __init__(self, peer_close_is_error=False, delegate=None):
+ self.delegate = delegate
+ self.peer_close_is_error = peer_close_is_error
+
+ def is_local_open(self, endpoint):
+ return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+ def is_local_uninitialised(self, endpoint):
+ return endpoint.state & Endpoint.LOCAL_UNINIT
+
+ def is_local_closed(self, endpoint):
+ return endpoint.state & Endpoint.LOCAL_CLOSED
+
+ def is_remote_open(self, endpoint):
+ return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+ def is_remote_closed(self, endpoint):
+ return endpoint.state & Endpoint.REMOTE_CLOSED
+
+ def print_error(self, endpoint, endpoint_type):
+ if endpoint.remote_condition:
+ print endpoint.remote_condition.description
+ elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
+ print "%s closed by peer" % endpoint_type
+
+ def on_link_remote_close(self, event):
+ if event.link.remote_condition:
+ self.on_link_error(event)
+ elif self.is_local_closed(event.link):
+ self.on_link_closed(event)
+ else:
+ self.on_link_closing(event)
+ event.link.close()
+
+ def on_session_remote_close(self, event):
+ if event.session.remote_condition:
+ self.on_session_error(event)
+ elif self.is_local_closed(event.session):
+ self.on_session_closed(event)
+ else:
+ self.on_session_closing(event)
+ event.session.close()
+
+ def on_connection_remote_close(self, event):
+ if event.connection.remote_condition:
+ self.on_connection_error(event)
+ elif self.is_local_closed(event.connection):
+ self.on_connection_closed(event)
+ else:
+ self.on_connection_closing(event)
+ event.connection.close()
+
+ def on_connection_local_open(self, event):
+ if self.is_remote_open(event.connection):
+ self.on_connection_opened(event)
+
+ def on_connection_remote_open(self, event):
+ if self.is_local_open(event.connection):
+ self.on_connection_opened(event)
+ elif self.is_local_uninitialised(event.connection):
+ self.on_connection_opening(event)
+ event.connection.open()
+
+ def on_session_local_open(self, event):
+ if self.is_remote_open(event.session):
+ self.on_session_opened(event)
+
+ def on_session_remote_open(self, event):
+ if self.is_local_open(event.session):
+ self.on_session_opened(event)
+ elif self.is_local_uninitialised(event.session):
+ self.on_session_opening(event)
+ event.session.open()
+
+ def on_link_local_open(self, event):
+ if self.is_remote_open(event.link):
+ self.on_link_opened(event)
+
+ def on_link_remote_open(self, event):
+ if self.is_local_open(event.link):
+ self.on_link_opened(event)
+ elif self.is_local_uninitialised(event.link):
+ self.on_link_opening(event)
+ event.link.open()
+
+ def on_connection_opened(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_connection_opened', event)
+
+ def on_session_opened(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_session_opened', event)
+
+ def on_link_opened(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_link_opened', event)
+
+ def on_connection_opening(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_connection_opening', event)
+
+ def on_session_opening(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_session_opening', event)
+
+ def on_link_opening(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_link_opening', event)
+
+ def on_connection_error(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_connection_error', event)
+ else:
+ self.print_error(event.connection, "connection")
+
+ def on_session_error(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_session_error', event)
+ else:
+ self.print_error(event.session, "session")
+ event.connection.close()
+
+ def on_link_error(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_link_error', event)
+ else:
+ self.print_error(event.link, "link")
+ event.connection.close()
+
+ def on_connection_closed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_connection_closed', event)
+
+ def on_session_closed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_session_closed', event)
+
+ def on_link_closed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_link_closed', event)
+
+ def on_connection_closing(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_connection_closing', event)
+ elif self.peer_close_is_error:
+ self.on_connection_error(event)
+
+ def on_session_closing(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_session_closing', event)
+ elif self.peer_close_is_error:
+ self.on_session_error(event)
+
+ def on_link_closing(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_link_closing', event)
+ elif self.peer_close_is_error:
+ self.on_link_error(event)
+
+class MessagingHandler(Handler, Acking):
+ """
+ A general purpose handler that makes the proton-c events somewhat
+ simpler to deal with and.or avoids repetitive tasks for common use
+ cases.
+ """
+ def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
+ self.handlers = []
+ # FlowController if used needs to see event before
+ # IncomingMessageHandler, as the latter may involve the
+ # delivery being released
+ if prefetch:
+ self.handlers.append(FlowController(prefetch))
+ self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+ self.handlers.append(IncomingMessageHandler(auto_accept, self))
+ self.handlers.append(OutgoingMessageHandler(auto_settle, self))
+
+class TransactionalAcking(object):
+ def accept(self, delivery, transaction):
+ transaction.accept(delivery)
+
+class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
+ def __init__(self, auto_settle=True, delegate=None):
+ super(TransactionHandler, self).__init__(auto_settle, delegate)
+
+ def on_settled(self, event):
+ if hasattr(event.delivery, "transaction"):
+ event.transaction = event.delivery.transaction
+ event.delivery.transaction.handle_outcome(event)
+
+ def on_transaction_declared(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_transaction_declared', event)
+
+ def on_transaction_committed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_transaction_committed', event)
+
+ def on_transaction_aborted(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_transaction_aborted', event)
+
+ def on_transaction_declare_failed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_transaction_declare_failed', event)
+
+ def on_transaction_commit_failed(self, event):
+ if self.delegate:
+ dispatch(self.delegate, 'on_transaction_commit_failed', event)
+
+class TransactionalClientHandler(Handler, TransactionalAcking):
+ def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
+ super(TransactionalClientHandler, self).__init__()
+ self.handlers = []
+ # FlowController if used needs to see event before
+ # IncomingMessageHandler, as the latter may involve the
+ # delivery being released
+ if prefetch:
+ self.handlers.append(FlowController(prefetch))
+ self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+ self.handlers.append(IncomingMessageHandler(auto_accept, self))
+ self.handlers.append(TransactionHandler(auto_settle, self))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
new file mode 100644
index 0000000..cab8c31
--- /dev/null
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -0,0 +1,827 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import os, Queue, socket, time, types
+from heapq import heappush, heappop, nsmallest
+from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
+from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
+from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
+from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
+from select import select
+from proton.handlers import nested_handlers, ScopedHandler
+
+class AmqpSocket(object):
+ """
+ Associates a transport with a connection and a socket and can be
+ used in an io loop to track the io for an AMQP 1.0 connection.
+ """
+
+ def __init__(self, conn, sock, events, heartbeat=None):
+ self.events = events
+ self.conn = conn
+ self.transport = Transport()
+ if heartbeat: self.transport.idle_timeout = heartbeat
+ self.transport.bind(self.conn)
+ self.socket = sock
+ self.socket.setblocking(0)
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ self.write_done = False
+ self.read_done = False
+ self._closed = False
+
+ def accept(self, force_sasl=True):
+ if force_sasl:
+ sasl = self.transport.sasl()
+ sasl.mechanisms("ANONYMOUS")
+ sasl.server()
+ sasl.done(SASL.OK)
+ #TODO: use SASL anyway if requested by peer
+ return self
+
+ def connect(self, host, port=None, username=None, password=None, force_sasl=True):
+ if username and password:
+ sasl = self.transport.sasl()
+ sasl.plain(username, password)
+ elif force_sasl:
+ sasl = self.transport.sasl()
+ sasl.mechanisms('ANONYMOUS')
+ sasl.client()
+ try:
+ self.socket.connect_ex((host, port or 5672))
+ except socket.gaierror, e:
+ raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
+ return self
+
+ def _closed_cleanly(self):
+ return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED
+
+ def closed(self):
+ if not self._closed and self.write_done and self.read_done:
+ self.close()
+ return True
+ else:
+ return False
+
+ def close(self):
+ self.socket.close()
+ self._closed = True
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self):
+ if self.read_done: return False
+ c = self.transport.capacity()
+ if c > 0:
+ return True
+ elif c < 0:
+ self.read_done = True
+ return False
+
+ def writing(self):
+ if self.write_done: return False
+ try:
+ p = self.transport.pending()
+ if p > 0:
+ return True
+ elif p < 0:
+ self.write_done = True
+ return False
+ else: # p == 0
+ return False
+ except TransportException, e:
+ self.write_done = True
+ return False
+
+ def readable(self):
+ c = self.transport.capacity()
+ if c > 0:
+ try:
+ data = self.socket.recv(c)
+ if data:
+ self.transport.push(data)
+ else:
+ if not self._closed_cleanly():
+ self.read_done = True
+ self.write_done = True
+ else:
+ self.transport.close_tail()
+ except TransportException, e:
+ print "Error on read: %s" % e
+ self.read_done = True
+ except socket.error, e:
+ print "Error on recv: %s" % e
+ self.read_done = True
+ self.write_done = True
+ elif c < 0:
+ self.read_done = True
+
+ def writable(self):
+ try:
+ p = self.transport.pending()
+ if p > 0:
+ data = self.transport.peek(p)
+ n = self.socket.send(data)
+ self.transport.pop(n)
+ elif p < 0:
+ self.write_done = True
+ except TransportException, e:
+ print "Error on write: %s" % e
+ self.write_done = True
+ except socket.error, e:
+ print "Error on send: %s" % e
+ self.write_done = True
+
+ def removed(self):
+ if not self._closed_cleanly():
+ self.transport.unbind()
+ self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn))
+
+ def tick(self):
+ t = self.transport.tick(time.time())
+ if t: return t
+ else: return None
+
+class AmqpAcceptor:
+ """
+ Listens for incoming sockets, creates an AmqpSocket for them and
+ adds that to the list of tracked 'selectables'. The acceptor can
+ itself be added to an io loop.
+ """
+
+ def __init__(self, events, loop, host, port):
+ self.events = events
+ self.loop = loop
+ self.socket = socket.socket()
+ self.socket.setblocking(0)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.socket.bind((host, port))
+ self.socket.listen(5)
+ self.loop.add(self)
+ self._closed = False
+
+ def closed(self):
+ if self._closed:
+ self.socket.close()
+ return True
+ else:
+ return False
+
+ def close(self):
+ self._closed = True
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self):
+ return not self._closed
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ sock, addr = self.socket.accept()
+ if sock:
+ self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept())
+
+ def removed(self): pass
+ def tick(self): return None
+
+
+class EventInjector(object):
+ """
+ Can be added to an io loop to allow events to be triggered by an
+ external thread but handled on the event thread associated with
+ the loop.
+ """
+ def __init__(self, collector):
+ self.collector = collector
+ self.queue = Queue.Queue()
+ self.pipe = os.pipe()
+ self._closed = False
+
+ def trigger(self, event):
+ self.queue.put(event)
+ os.write(self.pipe[1], "!")
+
+ def closed(self):
+ return self._closed and self.queue.empty()
+
+ def close(self):
+ self._closed = True
+
+ def fileno(self):
+ return self.pipe[0]
+
+ def reading(self):
+ return True
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ os.read(self.pipe[0], 512)
+ while not self.queue.empty():
+ event = self.queue.get()
+ self.collector.put(event.context, event.type)
+
+ def removed(self): pass
+ def tick(self): return None
+
+class PQueue:
+
+ def __init__(self):
+ self.entries = []
+
+ def add(self, priority, task):
+ heappush(self.entries, (priority, task))
+
+ def peek(self):
+ if self.entries:
+ return nsmallest(1, self.entries)[0]
+ else:
+ return None
+
+ def pop(self):
+ if self.entries:
+ return heappop(self.entries)
+ else:
+ return None
+
+ def __nonzero__(self):
+ if self.entries:
+ return True
+ else:
+ return False
+
+class Timer:
+ def __init__(self, collector):
+ self.collector = collector
+ self.events = PQueue()
+
+ def schedule(self, deadline, event):
+ self.events.add(deadline, event)
+
+ def tick(self):
+ while self.events:
+ deadline, event = self.events.peek()
+ if time.time() > deadline:
+ self.events.pop()
+ self.collector.put(event.context, event.type)
+ else:
+ return deadline
+ return None
+
+ @property
+ def pending(self):
+ return bool(self.events)
+
+class Events(object):
+ def __init__(self, *handlers):
+ self.collector = Collector()
+ self.timer = Timer(self.collector)
+ self.handlers = handlers
+
+ def connection(self):
+ conn = Connection()
+ conn.collect(self.collector)
+ return conn
+
+ def process(self):
+ result = False
+ while True:
+ ev = self.collector.peek()
+ if ev:
+ self.dispatch(ev)
+ self.collector.pop()
+ result = True
+ else:
+ return result
+
+ def dispatch(self, event):
+ for h in self.handlers:
+ event.dispatch(h)
+
+ @property
+ def empty(self):
+ return self.collector.peek() == None and not self.timer.pending
+
+class Names(object):
+ def __init__(self, base=10000):
+ self.names = []
+ self.base = base
+
+ def number(self, name):
+ if name not in self.names:
+ self.names.append(name)
+ return self.names.index(name) + self.base
+
+class ExtendedEventType(EventType):
+ USED = Names()
+ """
+ Event type identifier for events defined outside the proton-c
+ library
+ """
+ def __init__(self, name, number=None):
+ super(ExtendedEventType, self).__init__(number or ExtendedEventType.USED.number(name), "on_%s" % name)
+ self.name = name
+
+class ApplicationEvent(EventBase):
+ """
+ Application defined event, which can optionally be associated with
+ an engine object and or an arbitrary subject
+ """
+ def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
+ super(ApplicationEvent, self).__init__(PN_PYREF, self, ExtendedEventType(typename))
+ self.connection = connection
+ self.session = session
+ self.link = link
+ self.delivery = delivery
+ if self.delivery:
+ self.link = self.delivery.link
+ if self.link:
+ self.session = self.link.session
+ if self.session:
+ self.connection = self.session.connection
+ self.subject = subject
+
+ def __repr__(self):
+ objects = [self.connection, self.session, self.link, self.delivery, self.subject]
+ return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
+
+class StartEvent(ApplicationEvent):
+ def __init__(self, container):
+ super(StartEvent, self).__init__("start")
+ self.container = container
+
+def _min(a, b):
+ if a and b: return min(a, b)
+ elif a: return a
+ else: return b
+
+class SelectLoop(object):
+ """
+ An io loop based on select()
+ """
+ def __init__(self, events):
+ self.events = events
+ self.selectables = []
+ self._abort = False
+
+ def abort(self):
+ self._abort = True
+
+ def add(self, selectable):
+ self.selectables.append(selectable)
+
+ def remove(self, selectable):
+ self.selectables.remove(selectable)
+
+ @property
+ def redundant(self):
+ return self.events.empty and not self.selectables
+
+ @property
+ def aborted(self):
+ return self._abort
+
+ def run(self):
+ while not (self._abort or self.redundant):
+ self.do_work()
+
+ def do_work(self, timeout=None):
+ """@return True if some work was done, False if time-out expired"""
+ tick = self.events.timer.tick()
+ while self.events.process():
+ if self._abort: return
+ tick = self.events.timer.tick()
+
+ stable = False
+ while not stable:
+ reading = []
+ writing = []
+ closed = []
+ for s in self.selectables:
+ if s.reading(): reading.append(s)
+ if s.writing(): writing.append(s)
+ if s.closed(): closed.append(s)
+ else: tick = _min(tick, s.tick())
+
+ for s in closed:
+ self.selectables.remove(s)
+ s.removed()
+ stable = len(closed) == 0
+
+ if self.redundant:
+ return
+
+ if tick:
+ timeout = _min(tick - time.time(), timeout)
+ if timeout and timeout < 0:
+ timeout = 0
+ if reading or writing or timeout:
+ readable, writable, _ = select(reading, writing, [], timeout)
+ for s in self.selectables:
+ s.tick()
+ for s in readable:
+ s.readable()
+ for s in writable:
+ s.writable()
+
+ return bool(readable or writable)
+ else:
+ return False
+
+def delivery_tags():
+ count = 1
+ while True:
+ yield str(count)
+ count += 1
+
+def send_msg(sender, msg, tag=None, handler=None, transaction=None):
+ dlv = sender.delivery(tag or next(sender.tags))
+ if transaction:
+ dlv.local.data = [transaction.id]
+ dlv.update(0x34)
+ if handler:
+ dlv.context = handler
+ sender.send(msg.encode())
+ sender.advance()
+ return dlv
+
+def _send_msg(self, msg, tag=None, handler=None, transaction=None):
+ return send_msg(self, msg, tag, handler, transaction)
+
+
+class Transaction(object):
+ """
+ Class to track state of an AMQP 1.0 transaction.
+ """
+ def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
+ self.txn_ctrl = txn_ctrl
+ self.handler = handler
+ self.id = None
+ self._declare = None
+ self._discharge = None
+ self.failed = False
+ self._pending = []
+ self.settle_before_discharge = settle_before_discharge
+ self.declare()
+
+ def commit(self):
+ self.discharge(False)
+
+ def abort(self):
+ self.discharge(True)
+
+ def declare(self):
+ self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
+
+ def discharge(self, failed):
+ self.failed = failed
+ self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
+
+ def _send_ctrl(self, descriptor, value):
+ delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler)
+ delivery.transaction = self
+ return delivery
+
+ def accept(self, delivery):
+ self.update(delivery, PN_ACCEPTED)
+ if self.settle_before_discharge:
+ delivery.settle()
+ else:
+ self._pending.append(delivery)
+
+ def update(self, delivery, state=None):
+ if state:
+ delivery.local.data = [self.id, Described(ulong(state), [])]
+ delivery.update(0x34)
+
+ def _release_pending(self):
+ for d in self._pending:
+ d.update(Delivery.RELEASED)
+ d.settle()
+ self._clear_pending()
+
+ def _clear_pending(self):
+ self._pending = []
+
+ def handle_outcome(self, event):
+ if event.delivery == self._declare:
+ if event.delivery.remote.data:
+ self.id = event.delivery.remote.data[0]
+ self.handler.on_transaction_declared(event)
+ elif event.delivery.remote_state == Delivery.REJECTED:
+ self.handler.on_transaction_declare_failed(event)
+ else:
+ print "Unexpected outcome for declare: %s" % event.delivery.remote_state
+ self.handler.on_transaction_declare_failed(event)
+ elif event.delivery == self._discharge:
+ if event.delivery.remote_state == Delivery.REJECTED:
+ if not self.failed:
+ self.handler.on_transaction_commit_failed(event)
+ self._release_pending() # make this optional?
+ else:
+ if self.failed:
+ self.handler.on_transaction_aborted(event)
+ self._release_pending()
+ else:
+ self.handler.on_transaction_committed(event)
+ self._clear_pending()
+
+class LinkOption(object):
+ """
+ Abstract interface for link configuration options
+ """
+ def apply(self, link):
+ """
+ Subclasses will implement any configuration logic in this
+ method
+ """
+ pass
+ def test(self, link):
+ """
+ Subclasses can override this to selectively apply an option
+ e.g. based on some link criteria
+ """
+ return True
+
+class AtMostOnce(LinkOption):
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_SETTLED
+
+class AtLeastOnce(LinkOption):
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_UNSETTLED
+ link.rcv_settle_mode = Link.RCV_FIRST
+
+class SenderOption(LinkOption):
+ def apply(self, sender): pass
+ def test(self, link): return link.is_sender
+
+class ReceiverOption(LinkOption):
+ def apply(self, receiver): pass
+ def test(self, link): return link.is_receiver
+
+class Filter(ReceiverOption):
+ def __init__(self, filter_set={}):
+ self.filter_set = filter_set
+
+ def apply(self, receiver):
+ receiver.source.filter.put_dict(self.filter_set)
+
+class Selector(Filter):
+ """
+ Configures a link with a message selector filter
+ """
+ def __init__(self, value, name='selector'):
+ super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
+
+def _apply_link_options(options, link):
+ if options:
+ if isinstance(options, list):
+ for o in options:
+ if o.test(link): o.apply(link)
+ else:
+ if options.test(link): options.apply(link)
+
+def _create_session(connection, handler=None):
+ session = connection.session()
+ session.open()
+ return session
+
+
+def _get_attr(target, name):
+ if hasattr(target, name):
+ return getattr(target, name)
+ else:
+ return None
+
+class SessionPerConnection(object):
+ def __init__(self):
+ self._default_session = None
+
+ def session(self, connection):
+ if not self._default_session:
+ self._default_session = _create_session(connection)
+ self._default_session.context = self
+ return self._default_session
+
+ def on_session_remote_close(self, event):
+ event.connection.close()
+ self._default_session = None
+
+class Connector(Handler):
+ """
+ Internal handler that triggers the necessary socket connect for an
+ opened connection.
+ """
+ def __init__(self, loop):
+ self.loop = loop
+
+ def _connect(self, connection):
+ host, port = connection.address.next()
+ #print "connecting to %s:%i" % (host, port)
+ heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None
+ self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port))
+ connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
+
+ def on_connection_local_open(self, event):
+ if hasattr(event.connection, "address"):
+ self._connect(event.connection)
+
+ def on_connection_remote_open(self, event):
+ if hasattr(event.connection, "reconnect"):
+ event.connection.reconnect.reset()
+
+ def on_disconnected(self, event):
+ if hasattr(event.connection, "reconnect"):
+ event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference
+ delay = event.connection.reconnect.next()
+ if delay == 0:
+ print "Disconnected, reconnecting..."
+ self._connect(event.connection)
+ else:
+ print "Disconnected will try to reconnect after %s seconds" % delay
+ self.loop.schedule(time.time() + delay, connection=event.connection, subject=self)
+ else:
+ print "Disconnected"
+
+ def on_timer(self, event):
+ if event.subject == self and event.connection:
+ self._connect(event.connection)
+
+class Backoff(object):
+ """
+ A reconnect strategy involving an increasing delay between
+ retries, up to a maximum or 10 seconds.
+ """
+ def __init__(self):
+ self.delay = 0
+
+ def reset(self):
+ self.delay = 0
+
+ def next(self):
+ current = self.delay
+ if current == 0:
+ self.delay = 0.1
+ else:
+ self.delay = min(10, 2*current)
+ return current
+
+class Urls(object):
+ def __init__(self, values):
+ self.values = [Url(v) for v in values]
+ self.i = iter(self.values)
+
+ def __iter__(self):
+ return self
+
+ def _as_pair(self, url):
+ return (url.host, url.port)
+
+ def next(self):
+ try:
+ return self._as_pair(self.i.next())
+ except StopIteration:
+ self.i = iter(self.values)
+ return self._as_pair(self.i.next())
+
+class Container(object):
+ def __init__(self, *handlers):
+ h = [Connector(self), ScopedHandler()]
+ h.extend(nested_handlers(handlers))
+ self.events = Events(*h)
+ self.loop = SelectLoop(self.events)
+ self.trigger = None
+ self.container_id = str(generate_uuid())
+
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None):
+ conn = self.events.connection()
+ conn._pin = conn #circular reference until the open event gets handled
+ if handler:
+ conn.context = handler
+ conn.container = self.container_id or str(generate_uuid())
+ conn.heartbeat = heartbeat
+ if url: conn.address = Urls([url])
+ elif urls: conn.address = Urls(urls)
+ elif address: conn.address = address
+ else: raise ValueError("One of url, urls or address required")
+ if reconnect:
+ conn.reconnect = reconnect
+ elif reconnect is None:
+ conn.reconnect = Backoff()
+ conn._session_policy = SessionPerConnection() #todo: make configurable
+ conn.open()
+ return conn
+
+ def _get_id(self, container, remote, local):
+ if local and remote: "%s-%s-%s" % (container, remote, local)
+ elif local: return "%s-%s" % (container, local)
+ elif remote: return "%s-%s" % (container, remote)
+ else: return "%s-%s" % (container, str(generate_uuid()))
+
+ def _get_session(self, context):
+ if isinstance(context, Url):
+ return self._get_session(self.connect(url=context))
+ elif isinstance(context, Session):
+ return context
+ elif isinstance(context, Connection):
+ if hasattr(context, '_session_policy'):
+ return context._session_policy.session(context)
+ else:
+ return _create_session(context)
+ else:
+ return context.session()
+
+ def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
+ if isinstance(context, basestring):
+ context = Url(context)
+ if isinstance(context, Url) and not target:
+ target = context.path
+ session = self._get_session(context)
+ snd = session.sender(name or self._get_id(session.connection.container, target, source))
+ if source:
+ snd.source.address = source
+ if target:
+ snd.target.address = target
+ if handler:
+ snd.context = handler
+ snd.tags = tags or delivery_tags()
+ snd.send_msg = types.MethodType(_send_msg, snd)
+ _apply_link_options(options, snd)
+ snd.open()
+ return snd
+
+ def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
+ if isinstance(context, basestring):
+ context = Url(context)
+ if isinstance(context, Url) and not source:
+ source = context.path
+ session = self._get_session(context)
+ rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
+ if source:
+ rcv.source.address = source
+ if dynamic:
+ rcv.source.dynamic = True
+ if target:
+ rcv.target.address = target
+ if handler:
+ rcv.context = handler
+ _apply_link_options(options, rcv)
+ rcv.open()
+ return rcv
+
+ def declare_transaction(self, context, handler=None, settle_before_discharge=False):
+ if not _get_attr(context, '_txn_ctrl'):
+ context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl')
+ context._txn_ctrl.target.type = Terminus.COORDINATOR
+ context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+ return Transaction(context._txn_ctrl, handler, settle_before_discharge)
+
+ def listen(self, url):
+ host, port = Urls([url]).next()
+ return AmqpAcceptor(self.events, self, host, port)
+
+ def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+ def get_event_trigger(self):
+ if not self.trigger or self.trigger.closed():
+ self.trigger = EventInjector(self.events.collector)
+ self.add(self.trigger)
+ return self.trigger
+
+ def add(self, selectable):
+ self.loop.add(selectable)
+
+ def remove(self, selectable):
+ self.loop.remove(selectable)
+
+ def run(self):
+ self.events.dispatch(StartEvent(self))
+ self.loop.run()
+
+ def stop(self):
+ self.loop.abort()
+
+ def do_work(self, timeout=None):
+ return self.loop.do_work(timeout)
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
new file mode 100644
index 0000000..03c9417
--- /dev/null
+++ b/proton-c/bindings/python/proton/utils.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import Queue, socket, time
+from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
+from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
+from proton.handlers import ScopedHandler
+
+class BlockingLink(object):
+ def __init__(self, connection, link):
+ self.connection = connection
+ self.link = link
+ self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+ msg="Opening link %s" % link.name)
+
+ def close(self):
+ self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE),
+ msg="Closing link %s" % link.name)
+
+ # Access to other link attributes.
+ def __getattr__(self, name): return getattr(self.link, name)
+
+class BlockingSender(BlockingLink):
+ def __init__(self, connection, sender):
+ super(BlockingSender, self).__init__(connection, sender)
+
+ def send_msg(self, msg):
+ delivery = send_msg(self.link, msg)
+ self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
+
+class BlockingReceiver(BlockingLink):
+ def __init__(self, connection, receiver, credit=1):
+ super(BlockingReceiver, self).__init__(connection, receiver)
+ if credit: receiver.flow(credit)
+
+class BlockingConnection(Handler):
+ """
+ A synchronous style connection wrapper.
+ """
+ def __init__(self, url, timeout=None, container=None):
+ self.timeout = timeout
+ self.container = container or Container()
+ if isinstance(url, basestring):
+ self.url = Url(url)
+ else:
+ self.url = url
+ self.conn = self.container.connect(url=self.url, handler=self)
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
+ msg="Opening connection")
+
+ def create_sender(self, address, handler=None):
+ return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler))
+
+ def create_receiver(self, address, credit=1, dynamic=False, handler=None):
+ return BlockingReceiver(
+ self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit)
+
+ def close(self):
+ self.conn.close()
+ self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
+ msg="Closing connection")
+
+ def run(self):
+ """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+ self.container.run()
+
+ def wait(self, condition, timeout=False, msg=None):
+ """Call do_work until condition() is true"""
+ if timeout is False:
+ timeout = self.timeout
+ if timeout is None:
+ while not condition():
+ self.container.do_work()
+ else:
+ deadline = time.time() + timeout
+ while not condition():
+ if not self.container.do_work(deadline - time.time()):
+ txt = "Connection %s timed out" % self.url
+ if msg: txt += ": " + msg
+ raise Timeout(txt)
+
+ def on_link_remote_close(self, event):
+ if event.link.state & Endpoint.LOCAL_ACTIVE:
+ self.closed(event.link.remote_condition)
+
+ def on_connection_remote_close(self, event):
+ if event.connection.state & Endpoint.LOCAL_ACTIVE:
+ self.closed(event.connection.remote_condition)
+
+ def on_disconnected(self, event):
+ raise ConnectionException("Connection %s disconnected" % self.url);
+
+ def closed(self, error=None):
+ txt = "Connection %s closed" % self.url
+ if error:
+ txt += " due to: %s" % error
+ else:
+ txt += " by peer"
+ raise ConnectionException(txt)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: Initial set of engine examples along
with supporting additions to the library.
Posted by gs...@apache.org.
Initial set of engine examples along with supporting additions to the library.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/34e64e32
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/34e64e32
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/34e64e32
Branch: refs/heads/master
Commit: 34e64e3248b67971b701d990709920fa690c12b4
Parents: 9a72a30
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Dec 5 20:49:54 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Fri Dec 12 11:04:53 2014 +0000
----------------------------------------------------------------------
examples/engine/py/README | 155 ++++
examples/engine/py/abstract_server.py | 35 +
examples/engine/py/client.py | 59 ++
examples/engine/py/client_http.py | 110 +++
examples/engine/py/common.py | 699 ++++++++++++++++
examples/engine/py/db_common.py | 93 +++
examples/engine/py/db_ctrl.py | 46 ++
examples/engine/py/db_recv.py | 54 ++
examples/engine/py/db_send.py | 85 ++
examples/engine/py/helloworld.py | 45 +
examples/engine/py/helloworld_blocking.py | 35 +
examples/engine/py/helloworld_direct.py | 47 ++
examples/engine/py/helloworld_direct_tornado.py | 52 ++
examples/engine/py/helloworld_tornado.py | 49 ++
examples/engine/py/proton_server.py | 61 ++
examples/engine/py/proton_tornado.py | 70 ++
examples/engine/py/recurring_timer.py | 43 +
examples/engine/py/recurring_timer_tornado.py | 44 +
examples/engine/py/selected_recv.py | 40 +
examples/engine/py/server.py | 56 ++
examples/engine/py/server_tx.py | 77 ++
examples/engine/py/simple_recv.py | 40 +
examples/engine/py/simple_send.py | 53 ++
examples/engine/py/sync_client.py | 88 ++
examples/engine/py/tx_recv.py | 61 ++
examples/engine/py/tx_recv_interactive.py | 83 ++
examples/engine/py/tx_send.py | 75 ++
examples/engine/py/tx_send_sync.py | 76 ++
proton-c/bindings/python/CMakeLists.txt | 20 +-
proton-c/bindings/python/proton/__init__.py | 63 +-
proton-c/bindings/python/proton/handlers.py | 440 ++++++++++
proton-c/bindings/python/proton/reactors.py | 827 +++++++++++++++++++
proton-c/bindings/python/proton/utils.py | 114 +++
33 files changed, 3870 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/README
----------------------------------------------------------------------
diff --git a/examples/engine/py/README b/examples/engine/py/README
new file mode 100644
index 0000000..a361fc4
--- /dev/null
+++ b/examples/engine/py/README
@@ -0,0 +1,155 @@
+Most (though not all) of the current examples require a broker or
+similar intermediary that supports the AMQP 1.0 protocol, allows
+anonymous connections and accepts links to and from a node named
+'examples'.
+
+------------------------------------------------------------------
+
+helloworld.py
+
+Basic example that connects to an intermediary on localhost:5672,
+establishes a subscription from the 'examples' node on that
+intermediary, then creates a sending link to the same node and sends
+one message. On receving the message back via the subcription, the
+connection is closed.
+
+helloworld_blocking.py
+
+The same as the basic helloworld.py, but using a
+synchronous/sequential style wrapper on top of the
+asynchronous/reactive API. The purpose of this example is just to show
+how different functionality can be easily layered should it be
+desired.
+
+helloworld_direct.py
+
+A variant of the basic hellpwprld example, that does not use an
+intermediary, but listens for incoming connections itself. It
+establishes a connection to itself with a link over which a single
+message is sent. This demonstrates the ease with which a simple daemon
+can be built using the API.
+
+helloworld_tornado.py
+helloworld_direct_tornado.py
+
+These are variant of the helloworld.py and helloworld_direct.py
+examples that use the event loop from the tornado library, rather than
+that provided within proton itself and demonstrate how proton can be
+used with external loops.
+
+-------------------------------------------------------------------
+
+simple_send.py
+
+An example of sending a fixed number of messages and tracking their
+(asynchronous) acknowledgement. Handles disconnection while
+maintaining an at-least-once guarantee (there may be duplicates, but
+no message in the sequence should be lost). Messages are sent through
+the 'examples' node on an intermediary accessible on port 5672 on
+localhost.
+
+simple_recv.py
+
+Subscribes to the 'examples' node on an intermediary accessible on port 5672 on
+localhost. Simply prints out the body of received messages.
+
+db_send.py
+
+A more realistic sending example, where the messages come from records
+in a simple database table. On being acknowledged the records can be
+deleted from the table. The database access is done in a separate
+thread, so as not to block the event thread during data
+access. Messages are sent through the 'examples' node on an
+intermediary accessible on port 5672 on localhost.
+
+db_recv.py
+
+A receiving example that records messages received from the 'examples'
+node on localhost:5672 in a database table and only acknowledges them
+when the insert completes. Database access is again done in a separate
+thread from the event loop.
+
+db_ctrl.py
+
+A utility for setting up the database tables for the two examples
+above. Takes two arguments, the action to perform and the name of the
+database on which to perfom it. The database used by db_send.py is
+src_db, that by db_recv.py is dst_db. The valid actions are 'init',
+which creates the table, 'list' which displays the contents and
+'insert' which inserts records from standard-in and is used to
+populate src_db, e.g. for i in `seq 1 50`; do echo "Message-$i"; done
+| ./db_ctrl.py insert src_db.
+
+tx_send.py
+
+A sender that sends messages in atomic batches using local
+transactions (this example does not persist the messages in anyway).
+
+tx_send_sync.py
+
+A variant of the former example that waits for all messages in a batch
+to be acknowledged before committing. Used only to work around an
+ordering issue in preoton that affected qpidd.
+
+tx_recv.py
+
+A receiver example that accepts batches of messages using local
+transactions.
+
+tx_recv_interactive.py
+
+A testing utility that allow interactive control of the
+transactions. Actions are keyed in to the console, 'fetch' will
+request another message, 'abort' will abort the transaction, 'commit'
+will commit it.
+
+The various send/recv examples can be mixed and matched if desired.
+
+-------------------------------------------------------------------
+
+client.py
+
+The client part of a request-response example. Sends requests and
+prints out responses. Requires an intermediary that support the AMQP
+1.0 dynamic nodes on which the responses are received. The requests
+are sent through the 'examples' node.
+
+server.py
+
+The server part of a request-response example, that receives requests
+via the examples node, converts the body to uppercase and sends the
+result back to the indicated reply address.
+
+sync_client.py
+
+A variant of the client part, that uses a blocking/synchronous style
+instead of the reactive/asynchronous style.
+
+client_http.py
+
+A variant of the client part that takes the input to be submitted in
+the request over HTTP (point your browser to localhost:8888/client)
+
+server_tx.py
+
+A variant of the server part that consumes the request and sends out
+the response atomically in a local transaction.
+
+-------------------------------------------------------------------
+
+selected_recv.py
+
+An example that uses a selector filter.
+
+-------------------------------------------------------------------
+
+recurring_timer.py
+
+An example showing a simple timer event.
+
+recurring_timer_tornado.py
+
+A variant of the above that uses the tornado eventloop instead.
+
+-------------------------------------------------------------------
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/abstract_server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/abstract_server.py b/examples/engine/py/abstract_server.py
new file mode 100755
index 0000000..2d0de32
--- /dev/null
+++ b/examples/engine/py/abstract_server.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton_server import Server
+
+class Application(Server):
+ def __init__(self, host, address):
+ super(Application, self).__init__(host, address)
+
+ def on_request(self, request, reply_to):
+ response = request.upper()
+ self.send(response, reply_to)
+ print "Request from: %s" % reply_to
+
+try:
+ Application("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py
new file mode 100755
index 0000000..d1e2706
--- /dev/null
+++ b/examples/engine/py/client.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Client(MessagingHandler):
+ def __init__(self, host, address, requests):
+ super(Client, self).__init__()
+ self.host = host
+ self.address = address
+ self.requests = requests
+
+ def on_start(self, event):
+ self.conn = event.container.connect(self.host)
+ self.sender = event.container.create_sender(self.conn, self.address)
+ self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)
+
+ def next_request(self):
+ if self.receiver.remote_source.address:
+ req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
+ self.sender.send_msg(req)
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.next_request()
+
+ def on_message(self, event):
+ print "%s => %s" % (self.requests.pop(0), event.message.body)
+ if self.requests:
+ self.next_request()
+ else:
+ self.conn.close()
+
+REQUESTS= ["Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."]
+
+Container(Client("localhost:5672", "examples", REQUESTS)).run()
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
new file mode 100755
index 0000000..5202f8d
--- /dev/null
+++ b/examples/engine/py/client_http.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+from tornado.ioloop import IOLoop
+import tornado.web
+
+class Client(MessagingHandler):
+ def __init__(self, host, address):
+ super(Client, self).__init__()
+ self.host = host
+ self.address = address
+ self.sent = []
+ self.pending = []
+ self.reply_address = None
+ self.sender = None
+ self.receiver = None
+
+ def on_start(self, event):
+ conn = event.container.connect(self.host)
+ self.sender = event.container.create_sender(conn, self.address)
+ self.receiver = event.container.create_receiver(conn, None, dynamic=True)
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.reply_address = event.link.remote_source.address
+ self.do_request()
+
+ def on_credit(self, event):
+ self.do_request()
+
+ def on_message(self, event):
+ if self.sent:
+ request, handler = self.sent.pop(0)
+ print "%s => %s" % (request, event.message.body)
+ handler(event.message.body)
+ self.do_request()
+
+ def do_request(self):
+ if self.pending and self.reply_address and self.sender.credit:
+ request, handler = self.pending.pop(0)
+ self.sent.append((request, handler))
+ req = Message(reply_to=self.reply_address, body=request)
+ self.sender.send_msg(req)
+
+ def request(self, body, handler):
+ self.pending.append((body, handler))
+ self.do_request()
+
+class ExampleHandler(tornado.web.RequestHandler):
+ def initialize(self, client):
+ self.client = client
+
+ def get(self):
+ self._write_open()
+ self._write_form()
+ self._write_close()
+
+ @tornado.web.asynchronous
+ def post(self):
+ client.request(self.get_body_argument("message"), lambda x: self.on_response(x))
+
+ def on_response(self, body):
+ self.set_header("Content-Type", "text/html")
+ self._write_open()
+ self._write_form()
+ self.write("Response: " + body)
+ self._write_close()
+ self.finish()
+
+ def _write_open(self):
+ self.write('<html><body>')
+
+ def _write_close(self):
+ self.write('</body></html>')
+
+ def _write_form(self):
+ self.write('<form action="/client" method="POST">'
+ 'Request: <input type="text" name="message">'
+ '<input type="submit" value="Submit">'
+ '</form>')
+
+
+client = Client("localhost:5672", "examples")
+loop = TornadoLoop(client)
+app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))])
+app.listen(8888)
+try:
+ loop.run()
+except KeyboardInterrupt:
+ loop.stop()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/common.py b/examples/engine/py/common.py
new file mode 100644
index 0000000..d4d9a69
--- /dev/null
+++ b/examples/engine/py/common.py
@@ -0,0 +1,699 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import errno, os, random, select, time, traceback
+from proton import *
+from socket import *
+from threading import Thread
+from heapq import heappush, heappop, nsmallest
+
+class Selectable:
+
+ def __init__(self, transport, socket):
+ self.transport = transport
+ self.socket = socket
+ self.write_done = False
+ self.read_done = False
+
+ def closed(self):
+ if self.write_done and self.read_done:
+ self.socket.close()
+ return True
+ else:
+ return False
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self):
+ if self.read_done: return False
+ c = self.transport.capacity()
+ if c > 0:
+ return True
+ elif c < 0:
+ self.read_done = True
+ return False
+
+ def writing(self):
+ if self.write_done: return False
+ try:
+ p = self.transport.pending()
+ if p > 0:
+ return True
+ elif p < 0:
+ self.write_done = True
+ return False
+ except TransportException, e:
+ self.write_done = True
+ return False
+
+ def readable(self):
+ c = self.transport.capacity()
+ if c > 0:
+ try:
+ data = self.socket.recv(c)
+ if data:
+ self.transport.push(data)
+ else:
+ self.transport.close_tail()
+ except error, e:
+ print "read error", e
+ self.transport.close_tail()
+ self.read_done = True
+ elif c < 0:
+ self.read_done = True
+
+ def writable(self):
+ try:
+ p = self.transport.pending()
+ if p > 0:
+ data = self.transport.peek(p)
+ n = self.socket.send(data)
+ self.transport.pop(n)
+ elif p < 0:
+ self.write_done = True
+ except error, e:
+ print "write error", e
+ self.transport.close_head()
+ self.write_done = True
+
+ def tick(self, now):
+ return self.transport.tick(now)
+
+class Acceptor:
+
+ def __init__(self, driver, host, port):
+ self.driver = driver
+ self.socket = socket()
+ self.socket.setblocking(0)
+ self.socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+ self.socket.bind((host, port))
+ self.socket.listen(5)
+ self.driver.add(self)
+
+ def closed(self):
+ return False
+
+ def fileno(self):
+ return self.socket.fileno()
+
+ def reading(self):
+ return True
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ sock, addr = self.socket.accept()
+ sock.setblocking(0)
+ print "Incoming Connection:", addr
+ if sock:
+ conn = Connection()
+ conn.collect(self.driver.collector)
+ transport = Transport()
+ transport.bind(conn)
+ sasl = transport.sasl()
+ sasl.mechanisms("ANONYMOUS")
+ sasl.server()
+ sasl.done(SASL.OK)
+ sel = Selectable(transport, sock)
+ self.driver.add(sel)
+
+ def tick(self, now):
+ return None
+
+class Interrupter:
+
+ def __init__(self):
+ self.read, self.write = os.pipe()
+
+ def fileno(self):
+ return self.read
+
+ def readable(self):
+ os.read(self.read, 1024)
+
+ def interrupt(self):
+ os.write(self.write, 'x')
+
+class PQueue:
+
+ def __init__(self):
+ self.entries = []
+
+ def add(self, task, priority):
+ heappush(self.entries, (priority, task))
+
+ def peek(self):
+ if self.entries:
+ return nsmallest(1, self.entries)[0]
+ else:
+ return None
+
+ def pop(self):
+ if self.entries:
+ return heappop(self.entries)
+ else:
+ return None
+
+ def __nonzero__(self):
+ if self.entries:
+ return True
+ else:
+ return False
+
+import proton
+
+TIMER_EVENT = proton.EventType(10000, "on_timer")
+
+class Timer:
+
+ def __init__(self, collector):
+ self.collector = collector
+ self.tasks = PQueue()
+
+ def schedule(self, task, deadline):
+ self.tasks.add(task, deadline)
+
+ def tick(self, now):
+ while self.tasks:
+ deadline, task = self.tasks.peek()
+ if now > deadline:
+ self.tasks.pop()
+ self.collector.put(task, TIMER_EVENT)
+ else:
+ return deadline
+
+def _dispatch(ev, handler):
+ if ev.clazz == "pn_delivery" and ev.context.released:
+ return
+ else:
+ ev.dispatch(handler)
+
+def _expand(handlers):
+ result = []
+ for h in handlers:
+ if hasattr(h, "handlers"):
+ result.extend(h.handlers)
+ else:
+ result.append(h)
+ return result
+
+class Driver(Handler):
+
+ def __init__(self, *handlers):
+ self.collector = Collector()
+ self.handlers = _expand(handlers)
+ self.interrupter = Interrupter()
+ self.timer = Timer(self.collector)
+ self.selectables = []
+ self.now = None
+ self.deadline = None
+ self._abort = False
+ self._exit = False
+ self._thread = Thread(target=self.run)
+ self._thread.setDaemon(True)
+
+ def schedule(self, task, timeout):
+ self.timer.schedule(task, self.now + timeout)
+
+ def abort(self):
+ self._abort = True
+
+ def exit(self):
+ self._exit = True
+
+ def wakeup(self):
+ self.interrupter.interrupt()
+
+ def start(self):
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
+
+ def _init_deadline(self):
+ self.now = time.time()
+ self.deadline = None
+
+ def _update_deadline(self, t):
+ if t is None or t < self.now: return
+ if self.deadline is None or t < self.deadline:
+ self.deadline = t
+
+ @property
+ def _timeout(self):
+ if self.deadline is None:
+ return None
+ else:
+ return self.deadline - self.now
+
+ def run(self):
+ self._init_deadline()
+ for h in self.handlers:
+ dispatch(h, "on_start", self)
+
+ while True:
+ self._init_deadline()
+
+ while True:
+ self.process_events()
+ if self._abort: return
+ self._update_deadline(self.timer.tick(self.now))
+ count = self.process_events()
+ if self._abort: return
+ if not count:
+ break
+
+ reading = [self.interrupter]
+ writing = []
+
+ for s in self.selectables[:]:
+ if s.reading(): reading.append(s)
+ if s.writing(): writing.append(s)
+ self._update_deadline(s.tick(self.now))
+ if s.closed(): self.selectables.remove(s)
+
+ if self._exit and not self.selectables: return
+
+ try:
+ readable, writable, _ = select.select(reading, writing, [], self._timeout)
+ except select.error, (err, errtext):
+ if err == errno.EINTR:
+ continue
+ else:
+ raise
+
+ for s in readable:
+ s.readable()
+ for s in writable:
+ s.writable()
+
+ def process_events(self):
+ count = 0
+
+ quiesced = False
+ while True:
+ ev = self.collector.peek()
+ if ev:
+ count += 1
+ quiesced = False
+ _dispatch(ev, self)
+ for h in self.get_handlers(ev.context):
+ _dispatch(ev, h)
+ self.collector.pop()
+ elif quiesced:
+ return count
+ else:
+ for h in self.handlers:
+ dispatch(h, "on_quiesced", self)
+ quiesced = True
+
+ return count
+
+ getters = {
+ Transport: lambda x: x.connection,
+ Delivery: lambda x: x.link,
+ Sender: lambda x: x.session,
+ Receiver: lambda x: x.session,
+ Session: lambda x: x.connection,
+ }
+
+ def get_handlers(self, context):
+ if hasattr(context, "handlers"):
+ return context.handlers
+ elif context.__class__ in self.getters:
+ parent = self.getters[context.__class__](context)
+ return self.get_handlers(parent)
+ else:
+ return self.handlers
+
+ def on_connection_local_open(self, event):
+ conn = event.context
+ if conn.state & Endpoint.REMOTE_UNINIT:
+ self._connect(conn)
+
+ def _connect(self, conn):
+ transport = Transport()
+ transport.idle_timeout = 300
+ sasl = transport.sasl()
+ sasl.mechanisms("ANONYMOUS")
+ sasl.client()
+ transport.bind(conn)
+ sock = socket()
+ sock.setblocking(0)
+ hostport = conn.hostname.split(":", 1)
+ host = hostport[0]
+ if len(hostport) > 1:
+ port = int(hostport[1])
+ else:
+ port = 5672
+ sock.connect_ex((host, port))
+ selectable = Selectable(transport, sock)
+ self.add(selectable)
+
+ def on_timer(self, event):
+ event.context()
+
+ def connection(self, *handlers):
+ conn = Connection()
+ if handlers:
+ conn.handlers = _expand(handlers)
+ conn.collect(self.collector)
+ return conn
+
+ def acceptor(self, host, port):
+ return Acceptor(self, host, port)
+
+ def add(self, selectable):
+ self.selectables.append(selectable)
+
+class Handshaker(Handler):
+
+ def on_connection_remote_open(self, event):
+ conn = event.context
+ if conn.state & Endpoint.LOCAL_UNINIT:
+ conn.open()
+
+ def on_session_remote_open(self, event):
+ ssn = event.context
+ if ssn.state & Endpoint.LOCAL_UNINIT:
+ ssn.open()
+
+ def on_link_remote_open(self, event):
+ link = event.context
+ if link.state & Endpoint.LOCAL_UNINIT:
+ link.source.copy(link.remote_source)
+ link.target.copy(link.remote_target)
+ link.open()
+
+ def on_connection_remote_close(self, event):
+ conn = event.context
+ if not (conn.state & Endpoint.LOCAL_CLOSED):
+ conn.close()
+
+ def on_session_remote_close(self, event):
+ ssn = event.context
+ if not (ssn.state & Endpoint.LOCAL_CLOSED):
+ ssn.close()
+
+ def on_link_remote_close(self, event):
+ link = event.context
+ if not (link.state & Endpoint.LOCAL_CLOSED):
+ link.close()
+
+class FlowController(Handler):
+
+ def __init__(self, window):
+ self.window = window
+
+ def top_up(self, link):
+ delta = self.window - link.credit
+ link.flow(delta)
+
+ def on_link_local_open(self, event):
+ link = event.context
+ if link.is_receiver:
+ self.top_up(link)
+
+ def on_link_remote_open(self, event):
+ link = event.context
+ if link.is_receiver:
+ self.top_up(link)
+
+ def on_link_flow(self, event):
+ link = event.context
+ if link.is_receiver:
+ self.top_up(link)
+
+ def on_delivery(self, event):
+ delivery = event.context
+ if delivery.link.is_receiver:
+ self.top_up(delivery.link)
+
+class Row:
+
+ def __init__(self):
+ self.links = set()
+
+ def add(self, link):
+ self.links.add(link)
+
+ def discard(self, link):
+ self.links.discard(link)
+
+ def choose(self):
+ if self.links:
+ return random.choice(list(self.links))
+ else:
+ return None
+
+ def __iter__(self):
+ return iter(self.links)
+
+ def __nonzero__(self):
+ return bool(self.links)
+
+
+class Router(Handler):
+
+ EMPTY = Row()
+
+ def __init__(self):
+ self._outgoing = {}
+ self._incoming = {}
+
+ def incoming(self, address):
+ return self._incoming.get(address, self.EMPTY)
+
+ def outgoing(self, address):
+ return self._outgoing.get(address, self.EMPTY)
+
+ def address(self, link):
+ if link.is_sender:
+ return link.source.address or link.target.address
+ else:
+ return link.target.address
+
+ def table(self, link):
+ if link.is_sender:
+ return self._outgoing
+ else:
+ return self._incoming
+
+ def add(self, link):
+ address = self.address(link)
+ table = self.table(link)
+ row = table.get(address)
+ if row is None:
+ row = Row()
+ table[address] = row
+ row.add(link)
+
+ def remove(self, link):
+ address = self.address(link)
+ table = self.table(link)
+ row = table.get(address)
+ if row is not None:
+ row.discard(link)
+ if not row:
+ del table[address]
+
+ def on_link_local_open(self, event):
+ self.add(event.context)
+
+ def on_link_local_close(self, event):
+ self.remove(event.context)
+
+ def on_link_final(self, event):
+ self.remove(event.context)
+
+class Pool(Handler):
+
+ def __init__(self, collector, router=None):
+ self.collector = collector
+ self._connections = {}
+ if router:
+ self.outgoing_resolver = lambda address: router.outgoing(address).choose()
+ self.incoming_resolver = lambda address: router.incoming(address).choose()
+ else:
+ self.outgoing_resolver = lambda address: None
+ self.incoming_resolver = lambda address: None
+
+ def resolve(self, remote, local, resolver, constructor):
+ link = resolver(remote)
+ if link is None:
+ host = remote[2:].split("/", 1)[0]
+ conn = self._connections.get(host)
+ if conn is None:
+ conn = Connection()
+ conn.collect(self.collector)
+ conn.hostname = host
+ conn.open()
+ self._connections[host] = conn
+
+ ssn = conn.session()
+ ssn.open()
+ link = constructor(ssn, remote, local)
+ link.open()
+ return link
+
+ def on_transport_closed(self, event):
+ transport = event.context
+ conn = transport.connection
+ del self._connections[conn.hostname]
+
+ def outgoing(self, target, source=None):
+ return self.resolve(target, source, self.outgoing_resolver, self.new_outgoing)
+
+ def incoming(self, source, target=None):
+ return self.resolve(source, target, self.incoming_resolver, self.new_incoming)
+
+ def new_outgoing(self, ssn, remote, local):
+ snd = ssn.sender("%s-%s" % (local, remote))
+ snd.source.address = local
+ snd.target.address = remote
+ return snd
+
+ def new_incoming(self, ssn, remote, local):
+ rcv = ssn.receiver("%s-%s" % (remote, local))
+ rcv.source.address = remote
+ rcv.target.address = local
+ return rcv
+
+class MessageDecoder(Handler):
+
+ def __init__(self, delegate):
+ self.__delegate = delegate
+
+ def on_start(self, drv):
+ try:
+ self.__delegate
+ except AttributeError:
+ self.__delegate = self
+ self.__message = Message()
+
+ def on_delivery(self, event):
+ dlv = event.context
+ if dlv.link.is_receiver and not dlv.partial:
+ encoded = dlv.link.recv(dlv.pending)
+ self.__message.decode(encoded)
+ try:
+ dispatch(self.__delegate, "on_message", dlv.link, self.__message)
+ dlv.update(Delivery.ACCEPTED)
+ except:
+ dlv.update(Delivery.REJECTED)
+ traceback.print_exc()
+ dlv.settle()
+
+class Address:
+
+ def __init__(self, st):
+ self.st = st
+
+ @property
+ def host(self):
+ return self.st[2:].split("/", 1)[0]
+
+ @property
+ def path(self):
+ parts = self.st[2:].split("/", 1)
+ if len(parts) == 2:
+ return parts[1]
+ else:
+ return ""
+
+ def __repr__(self):
+ return "Address(%r)" % self.st
+
+ def __str__(self):
+ return self.st
+
+class SendQueue(Handler):
+
+ def __init__(self, address):
+ self.address = Address(address)
+ self.messages = []
+ self.sent = 0
+
+ def on_start(self, drv):
+ self.driver = drv
+ self.connect()
+
+ def connect(self):
+ self.conn = self.driver.connection(self)
+ self.conn.hostname = self.address.host
+ ssn = self.conn.session()
+ snd = ssn.sender(str(self.address))
+ snd.target.address = str(self.address)
+ ssn.open()
+ snd.open()
+ self.conn.open()
+ self.link = snd
+
+ def put(self, message):
+ self.messages.append(message.encode())
+ if self.link:
+ self.pump(self.link)
+
+ def on_link_flow(self, event):
+ link = event.context
+ self.pump(link)
+
+ def pump(self, link):
+ while self.messages and link.credit > 0:
+ dlv = link.delivery(str(self.sent))
+ bytes = self.messages.pop(0)
+ link.send(bytes)
+ dlv.settle()
+ self.sent += 1
+
+ def on_transport_closed(self, event):
+ conn = event.context.connection
+ self.conn = None
+ self.link = None
+ self.driver.schedule(self.connect, 1)
+
+# XXX: terrible name for this
+class RecvQueue(Handler):
+
+ def __init__(self, address, delegate):
+ self.address = Address(address)
+ self.delegate = delegate
+ self.decoder = MessageDecoder(self.delegate)
+ self.handlers = [FlowController(1024), self.decoder, self]
+
+ def on_start(self, drv):
+ self.driver = drv
+ self.decoder.on_start(drv)
+ self.connect()
+
+ def connect(self):
+ self.conn = self.driver.connection(self)
+ self.conn.hostname = self.address.host
+ ssn = self.conn.session()
+ rcv = ssn.receiver(str(self.address))
+ rcv.source.address = str(self.address)
+ ssn.open()
+ rcv.open()
+ self.conn.open()
+
+ def on_transport_closed(self, event):
+ conn = event.context.connection
+ self.conn = None
+ self.driver.schedule(self.connect, 1)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_common.py b/examples/engine/py/db_common.py
new file mode 100644
index 0000000..584c15a
--- /dev/null
+++ b/examples/engine/py/db_common.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import sqlite3
+import threading
+
+class Db(object):
+ def __init__(self, db, events):
+ self.db = db
+ self.events = events
+ self.tasks = Queue.Queue()
+ self.position = None
+ self.pending_events = []
+ self.thread = threading.Thread(target=self._process)
+ self.thread.daemon=True
+ self.thread.start()
+
+ def reset(self):
+ self.tasks.put(lambda conn: self._reset())
+
+ def load(self, records, event=None):
+ self.tasks.put(lambda conn: self._load(conn, records, event))
+
+ def insert(self, id, data, event=None):
+ self.tasks.put(lambda conn: self._insert(conn, id, data, event))
+
+ def delete(self, id, event=None):
+ self.tasks.put(lambda conn: self._delete(conn, id, event))
+
+ def _reset(self, ignored=None):
+ self.position = None
+
+ def _load(self, conn, records, event):
+ if self.position:
+ cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
+ else:
+ cursor = conn.execute("SELECT * FROM records ORDER BY id")
+ while not records.full():
+ row = cursor.fetchone()
+ if row:
+ self.position = row['id']
+ records.put(dict(row))
+ else:
+ break
+ if event:
+ self.events.trigger(event)
+
+ def _insert(self, conn, id, data, event):
+ if id:
+ conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
+ else:
+ conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
+ if event:
+ self.pending_events.append(event)
+
+ def _delete(self, conn, id, event):
+ conn.execute("DELETE FROM records WHERE id=?", (id,))
+ if event:
+ self.pending_events.append(event)
+
+ def _process(self):
+ conn = sqlite3.connect(self.db)
+ conn.row_factory = sqlite3.Row
+ with conn:
+ while True:
+ f = self.tasks.get(True)
+ try:
+ while True:
+ f(conn)
+ f = self.tasks.get(False)
+ except Queue.Empty: pass
+ conn.commit()
+ for event in self.pending_events:
+ self.events.trigger(event)
+ self.pending_events = []
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_ctrl.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_ctrl.py b/examples/engine/py/db_ctrl.py
new file mode 100755
index 0000000..b28e0eb
--- /dev/null
+++ b/examples/engine/py/db_ctrl.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlite3
+import sys
+
+if len(sys.argv) < 3:
+ print "Usage: %s [init|insert|list] db" % sys.argv[0]
+else:
+ conn = sqlite3.connect(sys.argv[2])
+ with conn:
+ if sys.argv[1] == "init":
+ conn.execute("DROP TABLE IF EXISTS records")
+ conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)")
+ conn.commit()
+ elif sys.argv[1] == "list":
+ cursor = conn.cursor()
+ cursor.execute("SELECT * FROM records")
+ rows = cursor.fetchall()
+ for r in rows:
+ print r
+ elif sys.argv[1] == "insert":
+ while True:
+ l = sys.stdin.readline()
+ if not l: break
+ conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),))
+ conn.commit()
+ else:
+ print "Unrecognised command: %s" % sys.argv[1]
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py
new file mode 100755
index 0000000..8b4490d
--- /dev/null
+++ b/examples/engine/py/db_recv.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactors import ApplicationEvent, Container
+from db_common import Db
+
+class Recv(MessagingHandler):
+ def __init__(self, url):
+ super(Recv, self).__init__(auto_accept=False)
+ self.url = url
+ self.delay = 0
+ # TODO: load last tag from db
+ self.last_id = None
+
+ def on_start(self, event):
+ self.db = Db("dst_db", event.container.get_event_trigger())
+ event.container.create_receiver(self.url)
+
+ def on_record_inserted(self, event):
+ self.accept(event.delivery)
+
+ def on_message(self, event):
+ id = int(event.message.id)
+ if (not self.last_id) or id > self.last_id:
+ self.last_id = id
+ self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+ print "inserted message %s" % id
+ else:
+ self.accept(event.delivery)
+
+try:
+ Container(Recv("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
new file mode 100755
index 0000000..ce3ce79
--- /dev/null
+++ b/examples/engine/py/db_send.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import time
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import ApplicationEvent, Container
+from db_common import Db
+
+class Send(MessagingHandler):
+ def __init__(self, url):
+ super(Send, self).__init__()
+ self.url = url
+ self.delay = 0
+ self.sent = 0
+ self.load_count = 0
+ self.records = Queue.Queue(maxsize=50)
+
+ def on_start(self, event):
+ self.container = event.container
+ self.db = Db("src_db", self.container.get_event_trigger())
+ self.sender = self.container.create_sender(self.url)
+
+ def on_records_loaded(self, event):
+ if self.records.empty():
+ if event.subject == self.load_count:
+ print "Exhausted available data, waiting to recheck..."
+ # check for new data after 5 seconds
+ self.container.schedule(time.time() + 5, link=self.sender, subject="data")
+ else:
+ self.send()
+
+ def request_records(self):
+ if not self.records.full():
+ print "loading records..."
+ self.load_count += 1
+ self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count))
+
+ def on_credit(self, event):
+ self.send()
+
+ def send(self):
+ while self.sender.credit and not self.records.empty():
+ record = self.records.get(False)
+ id = record['id']
+ self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+ self.sent += 1
+ print "sent message %s" % id
+ self.request_records()
+
+ def on_settled(self, event):
+ id = int(event.delivery.tag)
+ self.db.delete(id)
+ print "settled message %s" % id
+
+ def on_disconnected(self, event):
+ self.db.reset()
+
+ def on_timer(self, event):
+ if event.subject == "data":
+ print "Rechecking for data..."
+ self.request_records()
+
+try:
+ Container(Send("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
new file mode 100755
index 0000000..92d6083
--- /dev/null
+++ b/examples/engine/py/helloworld.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class HelloWorld(MessagingHandler):
+ def __init__(self, server, address):
+ super(HelloWorld, self).__init__()
+ self.server = server
+ self.address = address
+
+ def on_start(self, event):
+ conn = event.container.connect(self.server)
+ event.container.create_receiver(conn, self.address)
+ event.container.create_sender(conn, self.address)
+
+ def on_credit(self, event):
+ event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.close()
+
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
+
+Container(HelloWorld("localhost:5672", "examples")).run()
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_blocking.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_blocking.py b/examples/engine/py/helloworld_blocking.py
new file mode 100755
index 0000000..9c5e062
--- /dev/null
+++ b/examples/engine/py/helloworld_blocking.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.utils import BlockingConnection
+from proton.handlers import IncomingMessageHandler
+
+class HelloWorldReceiver(IncomingMessageHandler):
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
+
+conn = BlockingConnection("localhost:5672")
+conn.create_receiver("examples", handler=HelloWorldReceiver())
+sender = conn.create_sender("examples")
+sender.send_msg(Message(body=u"Hello World!"));
+conn.run()
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py
new file mode 100755
index 0000000..c961fe5
--- /dev/null
+++ b/examples/engine/py/helloworld_direct.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class HelloWorld(MessagingHandler):
+ def __init__(self, url):
+ super(HelloWorld, self).__init__()
+ self.url = url
+
+ def on_start(self, event):
+ self.acceptor = event.container.listen(self.url)
+ event.container.create_sender(self.url)
+
+ def on_credit(self, event):
+ event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.close()
+
+ def on_message(self, event):
+ print event.message.body
+
+ def on_accepted(self, event):
+ event.connection.close()
+
+ def on_connection_closed(self, event):
+ self.acceptor.close()
+
+Container(HelloWorld("localhost:8888/examples")).run()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
new file mode 100755
index 0000000..8873357
--- /dev/null
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorld(MessagingHandler):
+ def __init__(self, server, address):
+ super(HelloWorld, self).__init__()
+ self.server = server
+ self.address = address
+
+ def on_start(self, event):
+ self.eventloop = event.container
+ self.acceptor = event.container.listen(self.server)
+ conn = event.container.connect(self.server)
+ event.container.create_sender(conn, self.address)
+
+ def on_credit(self, event):
+ event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.close()
+
+ def on_message(self, event):
+ print event.message.body
+
+ def on_accepted(self, event):
+ event.connection.close()
+
+ def on_connection_closed(self, event):
+ self.acceptor.close()
+ self.eventloop.stop()
+
+TornadoLoop(HelloWorld("localhost:8888", "examples")).run()
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
new file mode 100755
index 0000000..f7d4c26
--- /dev/null
+++ b/examples/engine/py/helloworld_tornado.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton_tornado import TornadoLoop
+
+class HelloWorld(MessagingHandler):
+ def __init__(self, server, address):
+ super(HelloWorld, self).__init__()
+ self.server = server
+ self.address = address
+
+ def on_start(self, event):
+ self.eventloop = event.container
+ conn = event.container.connect(self.server)
+ event.container.create_receiver(conn, self.address)
+ event.container.create_sender(conn, self.address)
+
+ def on_credit(self, event):
+ event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.close()
+
+ def on_message(self, event):
+ print event.message.body
+ event.connection.close()
+
+ def on_connection_closed(self, event):
+ self.eventloop.stop()
+
+TornadoLoop(HelloWorld("localhost:5672", "examples")).run()
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_server.py b/examples/engine/py/proton_server.py
new file mode 100644
index 0000000..8a5077b
--- /dev/null
+++ b/examples/engine/py/proton_server.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import MessagingHandler
+
+class Server(MessagingHandler):
+ def __init__(self, host, address):
+ super(Server, self).__init__()
+ self.container = Container(self)
+ self.conn = self.container.connect(host)
+ self.receiver = self.container.create_receiver(self.conn, address)
+ self.senders = {}
+ self.relay = None
+
+ def on_message(self, event):
+ self.on_request(event.message.body, event.message.reply_to)
+
+ def on_connection_open(self, event):
+ if event.connection.remote_offered_capabilities and "ANONYMOUS-RELAY" in event.connection.remote_offered_capabilities:
+ self.relay = self.container.create_sender(self.conn, None)
+
+ def on_connection_close(self, endpoint, error):
+ if error: print "Closed due to %s" % error
+ self.conn.close()
+
+ def run(self):
+ self.container.run()
+
+ def send(self, response, reply_to):
+ sender = self.relay
+ if not sender:
+ sender = self.senders.get(reply_to)
+ if not sender:
+ sender = self.container.create_sender(self.conn, reply_to)
+ self.senders[reply_to] = sender
+ msg = Message(body=response)
+ if self.relay:
+ msg.address = reply_to
+ sender.send_msg(msg)
+
+ def on_request(self, request, reply_to):
+ pass
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/proton_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py
new file mode 100644
index 0000000..cfe7d6f
--- /dev/null
+++ b/examples/engine/py/proton_tornado.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import ApplicationEvent, Container, StartEvent
+import tornado.ioloop
+
+class TornadoLoop(Container):
+ def __init__(self, *handlers):
+ super(TornadoLoop, self).__init__(*handlers)
+ self.loop = tornado.ioloop.IOLoop.current()
+
+ def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
+ conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect)
+ self.events.process()
+ return conn
+
+ def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+ def add(self, conn):
+ self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+
+ def remove(self, conn):
+ self.loop.remove_handler(conn)
+
+ def run(self):
+ self.events.dispatch(StartEvent(self))
+ self.loop.start()
+
+ def stop(self):
+ self.loop.stop()
+
+ def _get_event_flags(self, conn):
+ flags = 0
+ if conn.reading():
+ flags |= tornado.ioloop.IOLoop.READ
+ # FIXME: need way to update flags to avoid busy loop
+ #if conn.writing():
+ # flags |= tornado.ioloop.IOLoop.WRITE
+ flags |= tornado.ioloop.IOLoop.WRITE
+ return flags
+
+ def _connection_ready(self, conn, events):
+ if events & tornado.ioloop.IOLoop.READ:
+ conn.readable()
+ if events & tornado.ioloop.IOLoop.WRITE:
+ conn.writable()
+ if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed():
+ self.loop.remove_handler(conn)
+ conn.close()
+ conn.removed()
+ self.events.process()
+ self.loop.update_handler(conn, self._get_event_flags(conn))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py
new file mode 100755
index 0000000..de530d3
--- /dev/null
+++ b/examples/engine/py/recurring_timer.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton.reactors import Container, Handler
+
+class Recurring(Handler):
+ def __init__(self, period):
+ self.period = period
+
+ def on_start(self, event):
+ self.container = event.container
+ self.container.schedule(time.time() + self.period, subject=self)
+
+ def on_timer(self, event):
+ print "Tick..."
+ self.container.schedule(time.time() + self.period, subject=self)
+
+try:
+ container = Container(Recurring(1.0))
+ container.run()
+except KeyboardInterrupt:
+ container.stop()
+ print
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/recurring_timer_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer_tornado.py b/examples/engine/py/recurring_timer_tornado.py
new file mode 100755
index 0000000..aeeb20c
--- /dev/null
+++ b/examples/engine/py/recurring_timer_tornado.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton.reactors import Handler
+from proton_tornado import TornadoLoop
+
+class Recurring(Handler):
+ def __init__(self, period):
+ self.period = period
+
+ def on_start(self, event):
+ self.container = event.container
+ self.container.schedule(time.time() + self.period, subject=self)
+
+ def on_timer(self, event):
+ print "Tick..."
+ self.container.schedule(time.time() + self.period, subject=self)
+
+try:
+ container = TornadoLoop(Recurring(1.0))
+ container.run()
+except KeyboardInterrupt:
+ container.stop()
+ print
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/selected_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/selected_recv.py b/examples/engine/py/selected_recv.py
new file mode 100755
index 0000000..d0df3b5
--- /dev/null
+++ b/examples/engine/py/selected_recv.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import Container, Selector
+from proton.handlers import MessagingHandler
+
+class Recv(MessagingHandler):
+ def __init__(self):
+ super(Recv, self).__init__()
+
+ def on_start(self, event):
+ conn = event.container.connect("localhost:5672")
+ event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'"))
+
+ def on_message(self, event):
+ print event.message.body
+
+try:
+ Container(Recv()).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py
new file mode 100755
index 0000000..3e6aad4
--- /dev/null
+++ b/examples/engine/py/server.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Server(MessagingHandler):
+ def __init__(self, host, address):
+ super(Server, self).__init__()
+ self.host = host
+ self.address = address
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = event.container.connect(self.host)
+ self.receiver = event.container.create_receiver(self.conn, self.address)
+ self.senders = {}
+ self.relay = None
+
+ def on_connection_opened(self, event):
+ if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+ self.relay = self.container.create_sender(self.conn, None)
+
+ def on_message(self, event):
+ sender = self.relay
+ if not sender:
+ sender = self.senders.get(event.message.reply_to)
+ if not sender:
+ sender = self.container.create_sender(self.conn, event.message.reply_to)
+ self.senders[event.message.reply_to] = sender
+ sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
+
+try:
+ Container(Server("localhost:5672", "examples")).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/server_tx.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py
new file mode 100755
index 0000000..0305a3f
--- /dev/null
+++ b/examples/engine/py/server_tx.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.reactors import Container
+from proton.handlers import MessagingHandler, TransactionHandler
+
+class TxRequest(TransactionHandler):
+ def __init__(self, response, sender, request_delivery):
+ super(TxRequest, self).__init__()
+ self.response = response
+ self.sender = sender
+ self.request_delivery = request_delivery
+
+ def on_transaction_declared(self, event):
+ self.sender.send_msg(self.response, transaction=event.transaction)
+ self.accept(self.request_delivery, transaction=event.transaction)
+ event.transaction.commit()
+
+ def on_transaction_committed(self, event):
+ print "Request processed successfully"
+
+ def on_transaction_aborted(self, event):
+ print "Request processing aborted"
+
+
+class TxServer(MessagingHandler):
+ def __init__(self, host, address):
+ super(TxServer, self).__init__(auto_accept=False)
+ self.host = host
+ self.address = address
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = event.container.connect(self.host, reconnect=False)
+ self.receiver = event.container.create_receiver(self.conn, self.address)
+ self.senders = {}
+ self.relay = None
+
+ def on_message(self, event):
+ sender = self.relay
+ if not sender:
+ sender = self.senders.get(event.message.reply_to)
+ if not sender:
+ sender = self.container.create_sender(self.conn, event.message.reply_to)
+ self.senders[event.message.reply_to] = sender
+
+ response = Message(address=event.message.reply_to, body=event.message.body.upper())
+ self.container.declare_transaction(self.conn, handler=TxRequest(response, sender, event.delivery))
+
+ def on_connection_open(self, event):
+ if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
+ self.relay = self.container.create_sender(self.conn, None)
+
+try:
+ Container(TxServer("localhost:5672", "examples")).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py
new file mode 100755
index 0000000..6825c86
--- /dev/null
+++ b/examples/engine/py/simple_recv.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Recv(MessagingHandler):
+ def __init__(self, url):
+ super(Recv, self).__init__()
+ self.url = url
+
+ def on_start(self, event):
+ event.container.create_receiver(self.url)
+
+ def on_message(self, event):
+ print event.message.body
+
+try:
+ Container(Recv("localhost:5672/examples")).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
new file mode 100755
index 0000000..21530ef
--- /dev/null
+++ b/examples/engine/py/simple_send.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactors import Container
+
+class Send(MessagingHandler):
+ def __init__(self, url, messages):
+ super(Send, self).__init__()
+ self.url = url
+ self.sent = 0
+ self.confirmed = 0
+ self.total = messages
+
+ def on_start(self, event):
+ event.container.create_sender(self.url)
+
+ def on_credit(self, event):
+ while event.sender.credit and self.sent < self.total:
+ msg = Message(body={'sequence':(self.sent+1)})
+ event.sender.send_msg(msg)
+ self.sent += 1
+
+ def on_accepted(self, event):
+ self.confirmed += 1
+ if self.confirmed == self.total:
+ print "all messages confirmed"
+ event.connection.close()
+
+ def on_disconnected(self, event):
+ self.sent = self.confirmed
+
+try:
+ Container(Send("localhost:5672/examples", 10000)).run()
+except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/sync_client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/sync_client.py b/examples/engine/py/sync_client.py
new file mode 100755
index 0000000..362385a
--- /dev/null
+++ b/examples/engine/py/sync_client.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Demonstrates the client side of the synchronous request-response pattern
+(also known as RPC or Remote Procecure Call) using proton.
+
+"""
+
+from proton import Message, Url, ConnectionException, Timeout
+from proton.utils import BlockingConnection
+from proton.handlers import IncomingMessageHandler
+import sys
+
+class SyncRequestClient(IncomingMessageHandler):
+ """
+ Implementation of the synchronous request-responce (aka RPC) pattern.
+ Create an instance and call invoke() to send a request and wait for a response.
+ """
+
+ def __init__(self, url, timeout=None):
+ """
+ @param url: a proton.Url or a URL string of the form 'host:port/path'
+ host:port is used to connect, path is used to identify the remote messaging endpoint.
+ """
+ super(SyncRequestClient, self).__init__()
+ self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout)
+ self.sender = self.connection.create_sender(url.path)
+ # dynamic=true generates a unique address dynamically for this receiver.
+ # credit=1 because we want to receive 1 response message initially.
+ self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
+ self.response = None
+
+ def invoke(self, request):
+ """Send a request, wait for and return the response"""
+ request.reply_to = self.reply_to
+ self.sender.send_msg(request)
+ self.connection.wait(lambda: self.response, msg="Waiting for response")
+ response = self.response
+ self.response = None # Ready for next response.
+ self.receiver.flow(1) # Set up credit for the next response.
+ return response
+
+ @property
+ def reply_to(self):
+ """Return the dynamic address of our receiver."""
+ return self.receiver.remote_source.address
+
+ def on_message(self, event):
+ """Called when we receive a message for our receiver."""
+ self.response = event.message # Store the response
+
+ def close(self):
+ self.connection.close()
+
+
+if __name__ == '__main__':
+ url = Url("0.0.0.0/examples")
+ if len(sys.argv) > 1: url = Url(sys.argv[1])
+
+ invoker = SyncRequestClient(url, timeout=2)
+ try:
+ REQUESTS= ["Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."]
+ for request in REQUESTS:
+ response = invoker.invoke(Message(body=request))
+ print "%s => %s" % (request, response.body)
+ finally:
+ invoker.close()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py
new file mode 100755
index 0000000..fc4bb8a
--- /dev/null
+++ b/examples/engine/py/tx_recv.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.reactors import Container
+from proton.handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+ def __init__(self, batch_size):
+ super(TxRecv, self).__init__(prefetch=0)
+ self.current_batch = 0
+ self.batch_size = batch_size
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = self.container.connect("localhost:5672")
+ self.receiver = self.container.create_receiver(self.conn, "examples")
+ self.container.declare_transaction(self.conn, handler=self)
+ self.transaction = None
+
+ def on_message(self, event):
+ print event.message.body
+ self.accept(event.delivery, self.transaction)
+ self.current_batch += 1
+ if self.current_batch == self.batch_size:
+ self.transaction.commit()
+ self.transaction = None
+
+ def on_transaction_declared(self, event):
+ self.receiver.flow(self.batch_size)
+ self.transaction = event.transaction
+
+ def on_transaction_committed(self, event):
+ self.current_batch = 0
+ self.container.declare_transaction(self.conn, handler=self)
+
+ def on_disconnected(self, event):
+ self.current_batch = 0
+
+try:
+ Container(TxRecv(10)).run()
+except KeyboardInterrupt: pass
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/34e64e32/examples/engine/py/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py
new file mode 100755
index 0000000..6eb320e
--- /dev/null
+++ b/examples/engine/py/tx_recv_interactive.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import threading
+from proton.reactors import ApplicationEvent, Container
+from proton.handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+ def __init__(self):
+ super(TxRecv, self).__init__(prefetch=0)
+
+ def on_start(self, event):
+ self.container = event.container
+ self.conn = self.container.connect("localhost:5672")
+ self.receiver = self.container.create_receiver(self.conn, "examples")
+ self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True)
+ self.transaction = None
+
+ def on_message(self, event):
+ print event.message.body
+ self.transaction.accept(event.delivery)
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ print "transaction declared"
+
+ def on_transaction_committed(self, event):
+ print "transaction committed"
+ self.container.declare_transaction(self.conn, handler=self)
+
+ def on_transaction_aborted(self, event):
+ print "transaction aborted"
+ self.container.declare_transaction(self.conn, handler=self)
+
+ def on_commit(self, event):
+ self.transaction.commit()
+
+ def on_abort(self, event):
+ self.transaction.abort()
+
+ def on_fetch(self, event):
+ self.receiver.flow(1)
+
+ def on_quit(self, event):
+ c = self.receiver.connection
+ self.receiver.close()
+ c.close()
+
+try:
+ reactor = Container(TxRecv())
+ events = reactor.get_event_trigger()
+ thread = threading.Thread(target=reactor.run)
+ thread.daemon=True
+ thread.start()
+
+ print "Enter 'fetch', 'commit' or 'abort'"
+ while True:
+ line = sys.stdin.readline()
+ if line:
+ events.trigger(ApplicationEvent(line.strip()))
+ else:
+ break
+except KeyboardInterrupt: pass
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org