You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/11/26 14:16:26 UTC

qpid-proton git commit: Added missing files to repo

Repository: qpid-proton
Updated Branches:
  refs/heads/examples a37304f96 -> 0fd8b1fc5


Added missing files to repo


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0fd8b1fc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0fd8b1fc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0fd8b1fc

Branch: refs/heads/examples
Commit: 0fd8b1fc538fa9ca1ae14a44c47d70943312a812
Parents: a37304f
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Nov 26 13:17:08 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Nov 26 13:17:08 2014 +0000

----------------------------------------------------------------------
 tutorial/proton_handlers.py | 409 +++++++++++++++++++++++++++++++++++++++
 tutorial/proton_utils.py    | 115 +++++++++++
 2 files changed, 524 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0fd8b1fc/tutorial/proton_handlers.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_handlers.py b/tutorial/proton_handlers.py
new file mode 100644
index 0000000..1381f31
--- /dev/null
+++ b/tutorial/proton_handlers.py
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import heapq, os, Queue, re, socket, time, types
+from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
+from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
+from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+from select import select
+
+class FlowController(Handler):
+
+    def __init__(self, window=1):
+        self.window = window
+
+    def top_up(self, link):
+        delta = self.window - link.credit
+        link.flow(delta)
+
+    def on_link_local_open(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_link_remote_open(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_link_flow(self, event):
+        if event.link.is_receiver:
+            self.top_up(event.link)
+
+    def on_delivery(self, event):
+        if not event.delivery.released and event.delivery.link.is_receiver:
+            self.top_up(event.delivery.link)
+
+def nested_handlers(handlers):
+    # currently only allows for a single level of nesting
+    nested = []
+    for h in handlers:
+        nested.append(h)
+        if hasattr(h, 'handlers'):
+            nested.extend(getattr(h, 'handlers'))
+    return nested
+
+def add_nested_handler(handler, nested):
+    if hasattr(handler, 'handlers'):
+        getattr(handler, 'handlers').append(nested)
+    else:
+        handler.handlers = [nested]
+
+class ScopedHandler(Handler):
+
+    scopes = {
+        "pn_connection": ["connection"],
+        "pn_session": ["session", "connection"],
+        "pn_link": ["link", "session", "connection"],
+        "pn_delivery": ["delivery", "link", "session", "connection"]
+    }
+
+    def on_unhandled(self, method, args):
+        event = args[0]
+        if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
+            return
+        objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])]
+        targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
+        handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
+        for h in handlers:
+            h(event)
+
+class OutgoingMessageHandler(Handler):
+    def __init__(self, auto_settle=True, delegate=None):
+        self.auto_settle = auto_settle
+        self.delegate = delegate
+
+    def on_link_flow(self, event):
+        if event.link.is_sender and event.link.credit:
+            self.on_credit(event)
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.released: return
+        if dlv.link.is_sender and dlv.updated:
+            if dlv.remote_state == Delivery.ACCEPTED:
+                self.on_accepted(event)
+            elif dlv.remote_state == Delivery.REJECTED:
+                self.on_rejected(event)
+            elif dlv.remote_state == Delivery.RELEASED:
+                self.on_released(event)
+            elif dlv.remote_state == Delivery.MODIFIED:
+                self.on_modified(event)
+            if dlv.settled:
+                self.on_settled(event)
+            if self.auto_settle:
+                dlv.settle()
+
+    def on_credit(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_credit', event)
+
+    def on_accepted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_accepted', event)
+
+    def on_rejected(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_rejected', event)
+
+    def on_released(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_released', event)
+
+    def on_modified(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_modified', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
+
+def recv_msg(delivery):
+    msg = Message()
+    msg.decode(delivery.link.recv(delivery.pending))
+    delivery.link.advance()
+    return msg
+
+class Reject(ProtonException):
+  """
+  An exception that indicate a message should be rejected
+  """
+  pass
+
+class Acking(object):
+    def accept(self, delivery):
+        self.settle(delivery, Delivery.ACCEPTED)
+
+    def reject(self, delivery):
+        self.settle(delivery, Delivery.REJECTED)
+
+    def release(self, delivery, delivered=True):
+        if delivered:
+            self.settle(delivery, Delivery.MODIFIED)
+        else:
+            self.settle(delivery, Delivery.RELEASED)
+
+    def settle(self, delivery, state=None):
+        if state:
+            delivery.update(state)
+        delivery.settle()
+
+class IncomingMessageHandler(Handler, Acking):
+    def __init__(self, auto_accept=True, delegate=None):
+        self.delegate = delegate
+        self.auto_accept = auto_accept
+
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.released or not dlv.link.is_receiver: return
+        if dlv.readable and not dlv.partial:
+            event.message = recv_msg(dlv)
+            try:
+                self.on_message(event)
+                if self.auto_accept:
+                    dlv.update(Delivery.ACCEPTED)
+                    dlv.settle()
+            except Reject:
+                dlv.update(Delivery.REJECTED)
+                dlv.settle()
+        elif dlv.updated and dlv.settled:
+            self.on_settled(event)
+
+    def on_message(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_message', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
+
+class EndpointStateHandler(Handler):
+    def __init__(self, peer_close_is_error=False, delegate=None):
+        self.delegate = delegate
+        self.peer_close_is_error = peer_close_is_error
+
+    def is_local_open(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_ACTIVE
+
+    def is_local_uninitialised(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_UNINIT
+
+    def is_local_closed(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_CLOSED
+
+    def is_remote_open(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_ACTIVE
+
+    def is_remote_closed(self, endpoint):
+        return endpoint.state & Endpoint.REMOTE_CLOSED
+
+    def print_error(self, endpoint, endpoint_type):
+        if endpoint.remote_condition:
+            print endpoint.remote_condition.description
+        elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
+            print "%s closed by peer" % endpoint_type
+
+    def on_link_remote_close(self, event):
+        if event.link.remote_condition:
+            self.on_link_error(event)
+        elif self.is_local_closed(event.link):
+            self.on_link_closed(event)
+        else:
+            self.on_link_closing(event)
+        event.link.close()
+
+    def on_session_remote_close(self, event):
+        if event.session.remote_condition:
+            self.on_session_error(event)
+        elif self.is_local_closed(event.session):
+            self.on_session_closed(event)
+        else:
+            self.on_session_closing(event)
+        event.session.close()
+
+    def on_connection_remote_close(self, event):
+        if event.connection.remote_condition:
+            self.on_connection_error(event)
+        elif self.is_local_closed(event.connection):
+            self.on_connection_closed(event)
+        else:
+            self.on_connection_closing(event)
+        event.connection.close()
+
+    def on_connection_local_open(self, event):
+        if self.is_remote_open(event.connection):
+            self.on_connection_opened(event)
+
+    def on_connection_remote_open(self, event):
+        if self.is_local_open(event.connection):
+            self.on_connection_opened(event)
+        elif self.is_local_uninitialised(event.connection):
+            self.on_connection_opening(event)
+            event.connection.open()
+
+    def on_session_local_open(self, event):
+        if self.is_remote_open(event.session):
+            self.on_session_opened(event)
+
+    def on_session_remote_open(self, event):
+        if self.is_local_open(event.session):
+            self.on_session_opened(event)
+        elif self.is_local_uninitialised(event.session):
+            self.on_session_opening(event)
+            event.session.open()
+
+    def on_link_local_open(self, event):
+        if self.is_remote_open(event.link):
+            self.on_link_opened(event)
+
+    def on_link_remote_open(self, event):
+        if self.is_local_open(event.link):
+            self.on_link_opened(event)
+        elif self.is_local_uninitialised(event.link):
+            self.on_link_opening(event)
+            event.link.open()
+
+    def on_connection_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opened', event)
+
+    def on_session_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opened', event)
+
+    def on_link_opened(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opened', event)
+
+    def on_connection_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opening', event)
+
+    def on_session_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opening', event)
+
+    def on_link_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opening', event)
+
+    def on_connection_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_error', event)
+        else:
+            self.print_error(event.connection, "connection")
+
+    def on_session_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_error', event)
+        else:
+            self.print_error(event.session, "session")
+            event.connection.close()
+
+    def on_link_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_error', event)
+        else:
+            self.print_error(event.link, "link")
+            event.connection.close()
+
+    def on_connection_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closed', event)
+
+    def on_session_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closed', event)
+
+    def on_link_closed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closed', event)
+
+    def on_connection_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closing', event)
+        elif self.peer_close_is_error:
+            self.on_connection_error(event)
+
+    def on_session_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closing', event)
+        elif self.peer_close_is_error:
+            self.on_session_error(event)
+
+    def on_link_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closing', event)
+        elif self.peer_close_is_error:
+            self.on_link_error(event)
+
+class MessagingHandler(Handler, Acking):
+    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
+
+class TransactionalAcking(object):
+    def accept(self, delivery, transaction, settle=True):
+        transaction.accept(delivery, settle)
+
+class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
+    def __init__(self, auto_settle=True, delegate=None):
+        super(TransactionHandler, self).__init__(auto_settle, delegate)
+
+    def on_settled(self, event):
+        if hasattr(event.delivery, "transaction"):
+            event.transaction = event.delivery.transaction
+            event.delivery.transaction.handle_outcome(event)
+
+    def on_transaction_declared(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declared', event)
+
+    def on_transaction_committed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_committed', event)
+
+    def on_transaction_aborted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_aborted', event)
+
+    def on_transaction_declare_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declare_failed', event)
+
+    def on_transaction_commit_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_commit_failed', event)
+
+class TransactionalClientHandler(Handler, TransactionalAcking):
+    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
+        super(TransactionalClientHandler, self).__init__()
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(TransactionHandler(auto_settle, self))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0fd8b1fc/tutorial/proton_utils.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_utils.py b/tutorial/proton_utils.py
new file mode 100644
index 0000000..fce0cf5
--- /dev/null
+++ b/tutorial/proton_utils.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import Queue, socket, time
+from proton import ConnectionException, Endpoint, Handler, Message, Url
+from proton_reactors import AmqpSocket, Events, MessagingContext, SelectLoop, send_msg
+from proton_handlers import ScopedHandler
+
+class BlockingLink(object):
+    def __init__(self, connection, link):
+        self.connection = connection
+        self.link = link
+        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+                             msg="Opening link %s" % link.name)
+
+    def close(self):
+        self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE),
+                             msg="Closing link %s" % link.name)
+
+    # Access to other link attributes.
+    def __getattr__(self, name): return getattr(self.link, name)
+
+class BlockingSender(BlockingLink):
+    def __init__(self, connection, sender):
+        super(BlockingSender, self).__init__(connection, sender)
+
+    def send_msg(self, msg):
+        delivery = send_msg(self.link, msg)
+        self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
+
+class BlockingReceiver(BlockingLink):
+    def __init__(self, connection, receiver, credit=1):
+        super(BlockingReceiver, self).__init__(connection, receiver)
+        if credit: receiver.flow(credit)
+
+class BlockingConnection(Handler):
+    def __init__(self, url, timeout=None):
+        self.timeout = timeout
+        self.events = Events(ScopedHandler())
+        self.loop = SelectLoop(self.events)
+        self.context = MessagingContext(self.loop.events.connection(), handler=self)
+        if isinstance(url, basestring):
+            self.url = Url(url)
+        else:
+            self.url = url
+        self.loop.add(
+            AmqpSocket(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
+        self.context.conn.open()
+        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT),
+                  msg="Opening connection")
+
+    def create_sender(self, address, handler=None):
+        return BlockingSender(self, self.context.create_sender(address, handler=handler))
+
+    def create_receiver(self, address, credit=1, dynamic=False, handler=None):
+        return BlockingReceiver(
+            self, self.context.create_receiver(address, dynamic=dynamic, handler=handler), credit=credit)
+
+    def close(self):
+        self.context.conn.close()
+        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE),
+                  msg="Closing connection")
+
+    def run(self):
+        """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
+        self.loop.run()
+
+    def wait(self, condition, timeout=False, msg=None):
+        """Call do_work until condition() is true"""
+        if timeout is False:
+            timeout = self.timeout
+        if timeout is None:
+            while not condition():
+                self.loop.do_work()
+        else:
+            deadline = time.time() + timeout
+            while not condition():
+                if not self.loop.do_work(deadline - time.time()):
+                    txt = "Connection %s timed out" % self.url
+                    if msg: txt += ": " + msg
+                    raise Timeout(txt)
+
+    def on_link_remote_close(self, event):
+        if event.link.state & Endpoint.LOCAL_ACTIVE:
+            self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        if event.connection.state & Endpoint.LOCAL_ACTIVE:
+            self.closed(event.connection.remote_condition)
+
+    def on_disconnected(self, event):
+        raise ConnectionException("Connection %s disconnected" % self.url);
+
+    def closed(self, error=None):
+        txt = "Connection %s closed" % self.url
+        if error:
+            txt += " due to: %s" % error
+        else:
+            txt += " by peer"
+        raise ConnectionException(txt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org