You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/09/10 21:35:15 UTC
svn commit: r1624104 - in /qpid/proton/branches/examples/tutorial:
db_common.py db_ctrl.py db_recv.py db_send.py proton_events.py
Author: gsim
Date: Wed Sep 10 19:35:14 2014
New Revision: 1624104
URL: http://svn.apache.org/r1624104
Log:
Added another example, transfer of records from one db to another
Added:
qpid/proton/branches/examples/tutorial/db_common.py
qpid/proton/branches/examples/tutorial/db_ctrl.py (with props)
qpid/proton/branches/examples/tutorial/db_recv.py (with props)
qpid/proton/branches/examples/tutorial/db_send.py (with props)
Modified:
qpid/proton/branches/examples/tutorial/proton_events.py
Added: qpid/proton/branches/examples/tutorial/db_common.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_common.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_common.py (added)
+++ qpid/proton/branches/examples/tutorial/db_common.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import sqlite3
+import threading
+
+class Db(object):
+ def __init__(self, db, events):
+ self.db = db
+ self.events = events
+ self.tasks = Queue.Queue()
+ self.position = None
+ self.thread = threading.Thread(target=self._process)
+ self.thread.daemon=True
+ self.thread.start()
+
+ def reset(self):
+ self.tasks.put(lambda conn: self._reset())
+
+ def load(self, records, event=None):
+ self.tasks.put(lambda conn: self._load(conn, records, event))
+
+ def insert(self, id, data, event=None):
+ self.tasks.put(lambda conn: self._insert(conn, id, data, event))
+
+ def delete(self, id, event=None):
+ self.tasks.put(lambda conn: self._delete(conn, id, event))
+
+ def _reset(self, ignored=None):
+ self.position = None
+
+ def _load(self, conn, records, event):
+ if self.position:
+ cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
+ else:
+ cursor = conn.execute("SELECT * FROM records ORDER BY id")
+ while not records.full():
+ row = cursor.fetchone()
+ if row:
+ self.position = row['id']
+ records.put(dict(row))
+ else:
+ break
+ if event:
+ self.events.trigger(event)
+
+ def _insert(self, conn, id, data, event):
+ if id:
+ conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
+ else:
+ conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
+ conn.commit()
+ if event:
+ self.events.trigger(event)
+
+ def _delete(self, conn, id, event):
+ conn.execute("DELETE FROM records WHERE id=?", (id,))
+ conn.commit()
+ if event:
+ self.events.trigger(event)
+
+ def _process(self):
+ conn = sqlite3.connect(self.db)
+ conn.row_factory = sqlite3.Row
+ with conn:
+ while True:
+ f = self.tasks.get(True)
+ f(conn)
Added: qpid/proton/branches/examples/tutorial/db_ctrl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_ctrl.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_ctrl.py (added)
+++ qpid/proton/branches/examples/tutorial/db_ctrl.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlite3
+import sys
+
+if len(sys.argv) < 3:
+ print "Usage: %s [init|insert|list] db" % sys.argv[0]
+else:
+ conn = sqlite3.connect(sys.argv[2])
+ with conn:
+ if sys.argv[1] == "init":
+ conn.execute("DROP TABLE IF EXISTS records")
+ conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)")
+ conn.commit()
+ elif sys.argv[1] == "list":
+ cursor = conn.cursor()
+ cursor.execute("SELECT * FROM records")
+ rows = cursor.fetchall()
+ for r in rows:
+ print r
+ elif sys.argv[1] == "insert":
+ while True:
+ l = sys.stdin.readline()
+ if not l: break
+ conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),))
+ conn.commit()
+ else:
+ print "Unrecognised command: %s" % sys.argv[1]
Propchange: qpid/proton/branches/examples/tutorial/db_ctrl.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/db_recv.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_recv.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_recv.py (added)
+++ qpid/proton/branches/examples/tutorial/db_recv.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from proton_events import ApplicationEvent, IncomingMessageHandler, EventLoop, FlowController
+from db_common import Db
+
+class Recv(IncomingMessageHandler):
+ def __init__(self, host, address):
+ self.eventloop = EventLoop()#self, FlowController(10))
+ self.host = host
+ self.address = address
+ self.delay = 0
+ self.db = Db("dst_db", self.eventloop.get_event_trigger())
+ # TODO: load last tag from db
+ self.last_id = None
+ self.connect()
+
+ def connect(self):
+ self.conn = self.eventloop.connect(self.host, handler=self)
+
+ def auto_accept(self): return False
+
+ def on_record_inserted(self, event):
+ self.accept(event.delivery)
+
+ def on_message(self, event):
+ id = int(event.message.id)
+ if (not self.last_id) or id > self.last_id:
+ self.last_id = id
+ self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+ print "inserted message %s" % id
+ else:
+ self.accept(event.delivery)
+
+ def on_connection_remote_open(self, event):
+ self.delay = 0
+ self.conn.receiver(self.address)
+
+ def on_link_remote_close(self, event):
+ self.closed(event.link.remote_condition)
+
+ def on_connection_remote_close(self, event):
+ self.closed(event.connection.remote_condition)
+
+ def closed(self, error=None):
+ if error:
+ print "Closed due to %s" % error
+ self.conn.close()
+
+ def on_disconnected(self, conn):
+ if self.delay == 0:
+ self.delay = 0.1
+ print "Disconnected, reconnecting..."
+ self.connect()
+ else:
+ print "Disconnected will try to reconnect after %d seconds" % self.delay
+ self.eventloop.schedule(time.time() + self.delay, connection=conn)
+ self.delay = min(10, 2*self.delay)
+
+ def on_timer(self, event):
+ print "Reconnecting..."
+ self.connect()
+
+ def run(self):
+ self.eventloop.run()
+
+try:
+ Recv("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+
+
+
Propchange: qpid/proton/branches/examples/tutorial/db_recv.py
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/proton/branches/examples/tutorial/db_send.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_send.py?rev=1624104&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/db_send.py (added)
+++ qpid/proton/branches/examples/tutorial/db_send.py Wed Sep 10 19:35:14 2014
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import time
+from proton import Message
+from proton_events import ApplicationEvent, EventLoop, OutgoingMessageHandler
+from db_common import Db
+
+class Send(OutgoingMessageHandler):
+ def __init__(self, host, address):
+ self.eventloop = EventLoop()
+ self.address = address
+ self.host = host
+ self.delay = 0
+ self.sent = 0
+ self.records = Queue.Queue(maxsize=50)
+ self.db = Db("src_db", self.eventloop.get_event_trigger())
+ self.connect()
+
+ def connect(self):
+ self.conn = self.eventloop.connect(self.host, handler=self)
+
+ def on_records_loaded(self, event):
+ if self.records.empty() and event.subject == self.sent:
+ print "Exhausted available data, waiting to recheck..."
+ # check for new data after 5 seconds
+ self.eventloop.schedule(time.time() + 5, link=self.sender, subject="data")
+ else:
+ self.send()
+
+ def request_records(self):
+ if not self.records.full():
+ self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent))
+
+ def on_link_flow(self, event):
+ self.send()
+
+ def send(self):
+ while self.sender.credit and not self.records.empty():
+ record = self.records.get(False)
+ id = record['id']
+ self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+ self.sent += 1
+ print "sent message %s" % id
+ self.request_records()
+
+ def on_settled(self, event):
+ id = int(event.delivery.tag)
+ self.db.delete(id)
+ print "settled message %s" % id
+
+ def on_connection_remote_open(self, event):
+ self.db.reset()
+ self.sender = self.conn.sender(self.address)
+ self.delay = 0
+
+ def on_link_remote_close(self, event):
+ self.closed(event.link.remote_condition)
+
+ def on_connection_remote_close(self, event):
+ self.closed(event.connection.remote_condition)
+
+ def closed(self, error=None):
+ if error:
+ print "Closed due to %s" % error
+ self.conn.close()
+
+ def on_disconnected(self, conn):
+ if self.delay == 0:
+ self.delay = 0.1
+ print "Disconnected, reconnecting..."
+ self.connect()
+ else:
+ print "Disconnected will try to reconnect after %d seconds" % self.delay
+ self.eventloop.schedule(time.time() + self.delay, connection=conn, subject="reconnect")
+ self.delay = min(10, 2*self.delay)
+
+ def on_timer(self, event):
+ if event.subject == "reconnect":
+ print "Reconnecting..."
+ self.connect()
+ elif event.subject == "data":
+ print "Rechecking for data..."
+ self.request_records()
+
+ def run(self):
+ self.eventloop.run()
+
+try:
+ Send("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+
Propchange: qpid/proton/branches/examples/tutorial/db_send.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1624104&r1=1624103&r2=1624104&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Sep 10 19:35:14 2014
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-import os, heapq, socket, time, types
+import heapq, os, Queue, socket, time, types
from proton import Collector, Connection, Delivery, Endpoint, Event
from proton import Message, ProtonException, Transport, TransportException
from select import select
@@ -51,7 +51,7 @@ class EventDispatcher(object):
}
def dispatch(self, event):
- getattr(self, self.methods.get(event.type, str(event.type)), self.unhandled)(event)
+ getattr(self, self.methods.get(event.type, "on_%s" % str(event.type)), self.unhandled)(event)
def unhandled(self, event):
pass
@@ -206,6 +206,40 @@ class Acceptor:
def removed(self): pass
+class EventInjector(object):
+ def __init__(self, events):
+ self.events = events
+ self.queue = Queue.Queue()
+ self.pipe = os.pipe()
+ self._closed = False
+
+ def trigger(self, event):
+ self.queue.put(event)
+ os.write(self.pipe[1], "!")
+
+ def closed(self):
+ return self._closed and self.queue.empty()
+
+ def close(self):
+ self._closed = True
+
+ def fileno(self):
+ return self.pipe[0]
+
+ def reading(self):
+ return True
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ os.read(self.pipe[0], 512)
+ while not self.queue.empty():
+ self.events.dispatch(self.queue.get())
+
+ def removed(self): pass
+
+
class Events(object):
def __init__(self, *dispatchers):
self.collector = Collector()
@@ -237,41 +271,49 @@ class Events(object):
def empty(self):
return self.collector.peek() == None
-class TimerEvent(Event):
- def __init__(self, connection=None, session=None, link=None, delivery=None):
- self.type = "on_timer"
+class ApplicationEvent(Event):
+ CATEGORY_GENERAL = "general"
+
+ def __init__(self, type, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.type = type
self.transport = None
+ self.subject = subject
if delivery:
self.delivery = delivery
self.link = delivery.link
- self.session = link.session
- self.connection = session.connection
+ self.session = self.link.session
+ self.connection = self.session.connection
self.category = Event.CATEGORY_DELIVERY
elif link:
self.delivery = None
self.link = link
- self.session = link.session
- self.connection = session.connection
+ self.session = self.link.session
+ self.connection = self.session.connection
self.category = Event.CATEGORY_LINK
elif session:
self.delivery = None
self.link = None
self.session = session
- self.connection = session.connection
+ self.connection = self.session.connection
category = Event.CATEGORY_SESSION
- else:
+ elif connection:
self.delivery = None
self.link = None
self.session = None
self.connection = connection
self.category = Event.CATEGORY_CONNECTION
+ else:
+ self.delivery = None
+ self.link = None
+ self.session = None
+ self.connection = None
+ self.category = ApplicationEvent.CATEGORY_GENERAL
def __repr__(self):
- objects = [self.connection, self.session, self.link, self.delivery]
+ objects = [self.connection, self.session, self.link, self.delivery, self.subject]
return "%s(%s)" % (self.type,
", ".join([str(o) for o in objects if o is not None]))
-
class ScheduledEvents(Events):
def __init__(self, *dispatchers):
super(ScheduledEvents, self).__init__(*dispatchers)
@@ -422,7 +464,7 @@ class ScopedDispatcher(EventDispatcher):
}
def dispatch(self, event):
- method = self.methods.get(event.type, str(event.type))
+ method = self.methods.get(event.type, "on_%s" % str(event.type))
objects = [getattr(event, attr) for attr in self.scopes.get(event.category, [])]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
handlers = [getattr(t, method) for t in targets if hasattr(t, method)]
@@ -590,6 +632,7 @@ class EventLoop(object):
else: l.append(FlowController(10))
self.events = ScheduledEvents(*l)
self.loop = SelectLoop(self.events)
+ self.trigger = None
def connect(self, url, name=None, handler=None):
identifier = name or url
@@ -606,14 +649,19 @@ class EventLoop(object):
if port: port = int(port)
return Acceptor(self.loop.events, self.loop.selectables, host, port)
- def schedule(self, deadline, connection=None, session=None, link=None, delivery=None):
- self.events.schedule(deadline, TimerEvent(connection, session, link, delivery))
+ def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+
+ def get_event_trigger(self):
+ if not self.trigger or self.trigger.closed():
+ self.trigger = EventInjector(self.events)
+ self.loop.selectables.append(self.trigger)
+ return self.trigger
def run(self):
self.loop.run()
-
class BlockingLink(object):
def __init__(self, connection, link):
self.connection = connection
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org