You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/10 21:35:15 UTC

svn commit: r1624104 - in /qpid/proton/branches/examples/tutorial: db_common.py db_ctrl.py db_recv.py db_send.py proton_events.py

Author: gsim
Date: Wed Sep 10 19:35:14 2014
New Revision: 1624104

URL: http://svn.apache.org/r1624104
Log:
Added another example, transfer of records from one db to another

Added:
    qpid/proton/branches/examples/tutorial/db_common.py
    qpid/proton/branches/examples/tutorial/db_ctrl.py   (with props)
    qpid/proton/branches/examples/tutorial/db_recv.py   (with props)
    qpid/proton/branches/examples/tutorial/db_send.py   (with props)
Modified:
    qpid/proton/branches/examples/tutorial/proton_events.py

Added: qpid/proton/branches/examples/tutorial/db_common.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_common.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_common.py (added)
+++ qpid/proton/branches/examples/tutorial/db_common.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import sqlite3
+import threading
+
+class Db(object):
+    def __init__(self, db, events):
+        self.db = db
+        self.events = events
+        self.tasks = Queue.Queue()
+        self.position = None
+        self.thread = threading.Thread(target=self._process)
+        self.thread.daemon=True
+        self.thread.start()
+
+    def reset(self):
+        self.tasks.put(lambda conn: self._reset())
+
+    def load(self, records, event=None):
+        self.tasks.put(lambda conn: self._load(conn, records, event))
+
+    def insert(self, id, data, event=None):
+        self.tasks.put(lambda conn: self._insert(conn, id, data, event))
+
+    def delete(self, id, event=None):
+        self.tasks.put(lambda conn: self._delete(conn, id, event))
+
+    def _reset(self, ignored=None):
+        self.position = None
+
+    def _load(self, conn, records, event):
+        if self.position:
+            cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
+        else:
+            cursor = conn.execute("SELECT * FROM records ORDER BY id")
+        while not records.full():
+            row = cursor.fetchone()
+            if row:
+                self.position = row['id']
+                records.put(dict(row))
+            else:
+                break
+        if event:
+            self.events.trigger(event)
+
+    def _insert(self, conn, id, data, event):
+        if id:
+            conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
+        else:
+            conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
+        conn.commit()
+        if event:
+            self.events.trigger(event)
+
+    def _delete(self, conn, id, event):
+        conn.execute("DELETE FROM records WHERE id=?", (id,))
+        conn.commit()
+        if event:
+            self.events.trigger(event)
+
+    def _process(self):
+        conn = sqlite3.connect(self.db)
+        conn.row_factory = sqlite3.Row
+        with conn:
+            while True:
+                f = self.tasks.get(True)
+                f(conn)

Added: qpid/proton/branches/examples/tutorial/db_ctrl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_ctrl.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_ctrl.py (added)
+++ qpid/proton/branches/examples/tutorial/db_ctrl.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlite3
+import sys
+
+if len(sys.argv) < 3:
+    print "Usage: %s [init|insert|list] db" % sys.argv[0]
+else:
+    conn = sqlite3.connect(sys.argv[2])
+    with conn:
+        if sys.argv[1] == "init":
+            conn.execute("DROP TABLE IF EXISTS records")
+            conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)")
+            conn.commit()
+        elif sys.argv[1] == "list":
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM records")
+            rows = cursor.fetchall()
+            for r in rows:
+                print r
+        elif sys.argv[1] == "insert":
+            while True:
+                l = sys.stdin.readline()
+                if not l: break
+                conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),))
+            conn.commit()
+        else:
+            print "Unrecognised command: %s" %  sys.argv[1]

Propchange: qpid/proton/branches/examples/tutorial/db_ctrl.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/branches/examples/tutorial/db_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_recv.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_recv.py (added)
+++ qpid/proton/branches/examples/tutorial/db_recv.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton_events import ApplicationEvent, IncomingMessageHandler, EventLoop, FlowController
+from db_common import Db
+
+class Recv(IncomingMessageHandler):
+    def __init__(self, host, address):
+        self.eventloop = EventLoop()#self, FlowController(10))
+        self.host = host
+        self.address = address
+        self.delay = 0
+        self.db = Db("dst_db", self.eventloop.get_event_trigger())
+        # TODO: load last tag from db
+        self.last_id = None
+        self.connect()
+
+    def connect(self):
+        self.conn = self.eventloop.connect(self.host, handler=self)
+
+    def auto_accept(self): return False
+
+    def on_record_inserted(self, event):
+        self.accept(event.delivery)
+
+    def on_message(self, event):
+        id = int(event.message.id)
+        if (not self.last_id) or id > self.last_id:
+            self.last_id = id
+            self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+            print "inserted message %s" % id
+        else:
+            self.accept(event.delivery)
+
+    def on_connection_remote_open(self, event):
+        self.delay = 0
+        self.conn.receiver(self.address)
+
+    def on_link_remote_close(self, event):
+        self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        self.closed(event.connection.remote_condition)
+
+    def closed(self, error=None):
+        if error:
+            print "Closed due to %s" % error
+        self.conn.close()
+
+    def on_disconnected(self, conn):
+        if self.delay == 0:
+            self.delay = 0.1
+            print "Disconnected, reconnecting..."
+            self.connect()
+        else:
+            print "Disconnected will try to reconnect after %d seconds" % self.delay
+            self.eventloop.schedule(time.time() + self.delay, connection=conn)
+            self.delay = min(10, 2*self.delay)
+
+    def on_timer(self, event):
+        print "Reconnecting..."
+        self.connect()
+
+    def run(self):
+        self.eventloop.run()
+
+try:
+    Recv("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+
+
+

Propchange: qpid/proton/branches/examples/tutorial/db_recv.py
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/branches/examples/tutorial/db_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_send.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_send.py (added)
+++ qpid/proton/branches/examples/tutorial/db_send.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import time
+from proton import Message
+from proton_events import ApplicationEvent, EventLoop, OutgoingMessageHandler
+from db_common import Db
+
+class Send(OutgoingMessageHandler):
+    def __init__(self, host, address):
+        self.eventloop = EventLoop()
+        self.address = address
+        self.host = host
+        self.delay = 0
+        self.sent = 0
+        self.records = Queue.Queue(maxsize=50)
+        self.db = Db("src_db", self.eventloop.get_event_trigger())
+        self.connect()
+
+    def connect(self):
+        self.conn = self.eventloop.connect(self.host, handler=self)
+
+    def on_records_loaded(self, event):
+        if self.records.empty() and event.subject == self.sent:
+            print "Exhausted available data, waiting to recheck..."
+            # check for new data after 5 seconds
+            self.eventloop.schedule(time.time() + 5, link=self.sender, subject="data")
+        else:
+            self.send()
+
+    def request_records(self):
+        if not self.records.full():
+            self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent))
+
+    def on_link_flow(self, event):
+        self.send()
+
+    def send(self):
+        while self.sender.credit and not self.records.empty():
+            record = self.records.get(False)
+            id = record['id']
+            self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+            self.sent += 1
+            print "sent message %s" % id
+        self.request_records()
+
+    def on_settled(self, event):
+        id = int(event.delivery.tag)
+        self.db.delete(id)
+        print "settled message %s" % id
+
+    def on_connection_remote_open(self, event):
+        self.db.reset()
+        self.sender = self.conn.sender(self.address)
+        self.delay = 0
+
+    def on_link_remote_close(self, event):
+        self.closed(event.link.remote_condition)
+
+    def on_connection_remote_close(self, event):
+        self.closed(event.connection.remote_condition)
+
+    def closed(self, error=None):
+        if error:
+            print "Closed due to %s" % error
+        self.conn.close()
+
+    def on_disconnected(self, conn):
+        if self.delay == 0:
+            self.delay = 0.1
+            print "Disconnected, reconnecting..."
+            self.connect()
+        else:
+            print "Disconnected will try to reconnect after %d seconds" % self.delay
+            self.eventloop.schedule(time.time() + self.delay, connection=conn, subject="reconnect")
+            self.delay = min(10, 2*self.delay)
+
+    def on_timer(self, event):
+        if event.subject == "reconnect":
+            print "Reconnecting..."
+            self.connect()
+        elif event.subject == "data":
+            print "Rechecking for data..."
+            self.request_records()
+
+    def run(self):
+        self.eventloop.run()
+
+try:
+    Send("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+

Propchange: qpid/proton/branches/examples/tutorial/db_send.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1624104&r1=1624103&r2=1624104&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Sep 10 19:35:14 2014
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import os, heapq, socket, time, types
+import heapq, os, Queue, socket, time, types
 from proton import Collector, Connection, Delivery, Endpoint, Event
 from proton import Message, ProtonException, Transport, TransportException
 from select import select
@@ -51,7 +51,7 @@ class EventDispatcher(object):
     }
 
     def dispatch(self, event):
-        getattr(self, self.methods.get(event.type, str(event.type)), self.unhandled)(event)
+        getattr(self, self.methods.get(event.type, "on_%s" % str(event.type)), self.unhandled)(event)
 
     def unhandled(self, event):
         pass
@@ -206,6 +206,40 @@ class Acceptor:
 
     def removed(self): pass
 
+class EventInjector(object):
+    def __init__(self, events):
+        self.events = events
+        self.queue = Queue.Queue()
+        self.pipe = os.pipe()
+        self._closed = False
+
+    def trigger(self, event):
+        self.queue.put(event)
+        os.write(self.pipe[1], "!")
+
+    def closed(self):
+        return self._closed and self.queue.empty()
+
+    def close(self):
+        self._closed = True
+
+    def fileno(self):
+        return self.pipe[0]
+
+    def reading(self):
+        return True
+
+    def writing(self):
+        return False
+
+    def readable(self):
+        os.read(self.pipe[0], 512)
+        while not self.queue.empty():
+            self.events.dispatch(self.queue.get())
+
+    def removed(self): pass
+
+
 class Events(object):
     def __init__(self, *dispatchers):
         self.collector = Collector()
@@ -237,41 +271,49 @@ class Events(object):
     def empty(self):
         return self.collector.peek() == None
 
-class TimerEvent(Event):
-    def __init__(self, connection=None, session=None, link=None, delivery=None):
-        self.type = "on_timer"
+class ApplicationEvent(Event):
+    CATEGORY_GENERAL = "general"
+
+    def __init__(self, type, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.type = type
         self.transport = None
+        self.subject = subject
         if delivery:
             self.delivery = delivery
             self.link = delivery.link
-            self.session = link.session
-            self.connection = session.connection
+            self.session = self.link.session
+            self.connection = self.session.connection
             self.category = Event.CATEGORY_DELIVERY
         elif link:
             self.delivery = None
             self.link = link
-            self.session = link.session
-            self.connection = session.connection
+            self.session = self.link.session
+            self.connection = self.session.connection
             self.category = Event.CATEGORY_LINK
         elif session:
             self.delivery = None
             self.link = None
             self.session = session
-            self.connection = session.connection
+            self.connection = self.session.connection
             category = Event.CATEGORY_SESSION
-        else:
+        elif connection:
             self.delivery = None
             self.link = None
             self.session = None
             self.connection = connection
             self.category = Event.CATEGORY_CONNECTION
+        else:
+            self.delivery = None
+            self.link = None
+            self.session = None
+            self.connection = None
+            self.category = ApplicationEvent.CATEGORY_GENERAL
 
     def __repr__(self):
-        objects = [self.connection, self.session, self.link, self.delivery]
+        objects = [self.connection, self.session, self.link, self.delivery, self.subject]
         return "%s(%s)" % (self.type,
                            ", ".join([str(o) for o in objects if o is not None]))
 
-
 class ScheduledEvents(Events):
     def __init__(self, *dispatchers):
         super(ScheduledEvents, self).__init__(*dispatchers)
@@ -422,7 +464,7 @@ class ScopedDispatcher(EventDispatcher):
     }
 
     def dispatch(self, event):
-        method = self.methods.get(event.type, str(event.type))
+        method = self.methods.get(event.type, "on_%s" % str(event.type))
         objects = [getattr(event, attr) for attr in self.scopes.get(event.category, [])]
         targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
         handlers = [getattr(t, method) for t in targets if hasattr(t, method)]
@@ -590,6 +632,7 @@ class EventLoop(object):
         else: l.append(FlowController(10))
         self.events = ScheduledEvents(*l)
         self.loop = SelectLoop(self.events)
+        self.trigger = None
 
     def connect(self, url, name=None, handler=None):
         identifier = name or url
@@ -606,14 +649,19 @@ class EventLoop(object):
         if port: port = int(port)
         return Acceptor(self.loop.events, self.loop.selectables, host, port)
 
-    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None):
-        self.events.schedule(deadline, TimerEvent(connection, session, link, delivery))
+    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+        self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+    def get_event_trigger(self):
+        if not self.trigger or self.trigger.closed():
+            self.trigger = EventInjector(self.events)
+            self.loop.selectables.append(self.trigger)
+        return self.trigger
 
     def run(self):
         self.loop.run()
 
 
-
 class BlockingLink(object):
     def __init__(self, connection, link):
         self.connection = connection



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org