You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/11/07 23:30:40 UTC
svn commit: r592927 - in /incubator/qpid/trunk/qpid/python: hello-world
qpid/client.py qpid/delegate.py qpid/peer.py
Author: rhs
Date: Wed Nov 7 14:30:40 2007
New Revision: 592927
URL: http://svn.apache.org/viewvc?rev=592927&view=rev
Log:
python API updates
Modified:
incubator/qpid/trunk/qpid/python/hello-world
incubator/qpid/trunk/qpid/python/qpid/client.py
incubator/qpid/trunk/qpid/python/qpid/delegate.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
Modified: incubator/qpid/trunk/qpid/python/hello-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-world?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-world Wed Nov 7 14:30:40 2007
@@ -5,20 +5,21 @@
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
-ch = client.channel(1)
-ch.session_open()
-ch.queue_declare(queue="test")
-ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-#print ch.queue_query(queue="test")
-ch.message_subscribe(queue="test", destination="amq.direct")
-ch.message_flow("amq.direct", 0, 0xFFFFFFFF)
-ch.message_flow("amq.direct", 1, 0xFFFFFFFF)
+ssn = client.session()
+ssn.open()
+ssn.queue_declare(queue="test")
+ssn.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
+#print ssn.queue_query(queue="test")
+ssn.message_subscribe(queue="test", destination="amq.direct")
+ssn.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ssn.message_flow("amq.direct", 1, 0xFFFFFFFF)
msg = Content("hello world")
msg["content_type"] = "text/plain"
msg["routing_key"] = "test"
msg["reply_to"] = client.structs.reply_to("asdf", "fdsa")
msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
-ch.message_transfer(destination="amq.direct", content=msg)
+ssn.message_transfer(destination="amq.direct", content=msg)
queue = client.queue("amq.direct")
msg = queue.get(timeout=10)
print msg
+ssn.close()
Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Wed Nov 7 14:30:40 2007
@@ -23,7 +23,7 @@
"""
import os, threading
-from peer import Peer, Closed
+from peer import Peer, Channel, Closed
from delegate import Delegate
from connection import Connection, Frame, connect
from spec import load
@@ -45,6 +45,7 @@
raise EnvironmentError("environment variable AMQP_SPEC must be set")
self.spec = load(name)
self.structs = StructFactory(self.spec)
+ self.sessions = {}
self.mechanism = None
self.response = None
@@ -86,7 +87,7 @@
self.socket = connect(self.host, self.port)
self.conn = Connection(self.socket, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
+ self.peer = Peer(self.conn, ClientDelegate(self), Session)
self.conn.init()
self.peer.start()
@@ -94,10 +95,29 @@
self.channel(0).connection_open(self.vhost)
def channel(self, id):
- return self.peer.channel(id)
+ self.lock.acquire()
+ try:
+ ssn = self.peer.channel(id)
+ ssn.client = self
+ self.sessions[id] = ssn
+ finally:
+ self.lock.release()
+ return ssn
- def opened(self, ch):
- ch.references = References()
+ def session(self):
+ self.lock.acquire()
+ try:
+ id = None
+ for i in xrange(1, 64*1024):
+ if not self.sessions.has_key(id):
+ id = i
+ break
+ finally:
+ self.lock.release()
+ if id == None:
+ raise RuntimeError("out of channels")
+ else:
+ return self.channel(id)
def close(self):
self.socket.close()
@@ -144,16 +164,16 @@
msg.ok()
def channel_close(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def session_ack(self, ch, msg):
pass
def session_closed(self, ch, msg):
- ch.close(msg)
+ ch.closed(msg)
def connection_close(self, ch, msg):
- self.client.peer.close(msg)
+ self.client.peer.closed(msg)
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
@@ -162,7 +182,7 @@
future = ch.futures[msg.command_id]
future.put_response(ch, msg.data)
- def close(self, reason):
+ def closed(self, reason):
self.client.closed = True
self.client.reason = reason
self.client.started.set()
@@ -185,3 +205,21 @@
def struct(self, name, *args, **kwargs):
return self.spec.struct(name, *args, **kwargs)
+
+class Session(Channel):
+
+ def __init__(self, *args):
+ Channel.__init__(self, *args)
+ self.references = References()
+ self.client = None
+
+ def open(self):
+ self.session_open()
+
+ def close(self):
+ self.session_close()
+ self.client.lock.acquire()
+ try:
+ del self.client.sessions[self.id]
+ finally:
+ self.client.lock.release()
Modified: incubator/qpid/trunk/qpid/python/qpid/delegate.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegate.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegate.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegate.py Wed Nov 7 14:30:40 2007
@@ -49,5 +49,5 @@
print >> sys.stderr, "Error in handler: %s\n\n%s" % \
(_handler_name(method), traceback.format_exc())
- def close(self, reason):
+ def closed(self, reason):
print "Connection closed: %s" % reason
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?rev=592927&r1=592926&r2=592927&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Wed Nov 7 14:30:40 2007
@@ -51,14 +51,17 @@
class Peer:
- def __init__(self, conn, delegate, channel_callback=None):
+ def __init__(self, conn, delegate, channel_factory=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
- self.channel_callback = channel_callback #notified when channels are created
+ if channel_factory:
+ self.channel_factory = channel_factory
+ else:
+ self.channel_factory = Channel
def channel(self, id):
self.lock.acquire()
@@ -66,10 +69,8 @@
try:
ch = self.channels[id]
except KeyError:
- ch = Channel(id, self.outgoing, self.conn.spec)
+ ch = self.channel_factory(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
- if self.channel_callback:
- self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -82,7 +83,7 @@
def fatal(self, message=None):
"""Call when an unexpected exception occurs that will kill a thread."""
if message: print >> sys.stderr, message
- self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
+ self.closed("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
def reader(self):
try:
@@ -97,13 +98,13 @@
except:
self.fatal()
- def close(self, reason):
+ def closed(self, reason):
# We must close the delegate first because closing channels
# may wake up waiting threads and we don't want them to see
# the delegate as open.
- self.delegate.close(reason)
+ self.delegate.closed(reason)
for ch in self.channels.values():
- ch.close(reason)
+ ch.closed(reason)
def writer(self):
try:
@@ -112,7 +113,7 @@
message = self.outgoing.get()
self.conn.write(message)
except socket.error, e:
- self.close(e)
+ self.closed(e)
break
self.conn.flush()
except:
@@ -131,7 +132,7 @@
self.delegate(channel, Message(channel, frame, content))
except QueueClosed:
- self.close("worker closed")
+ self.closed("worker closed")
except:
self.fatal()
@@ -181,7 +182,7 @@
self.incoming = Queue(0)
self.responses = Queue(0)
self.queue = None
- self.closed = False
+ self._closed = False
self.reason = None
self.requester = Requester(self.write)
@@ -200,10 +201,10 @@
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
- def close(self, reason):
- if self.closed:
+ def closed(self, reason):
+ if self._closed:
return
- self.closed = True
+ self._closed = True
self.reason = reason
self.incoming.close()
self.responses.close()
@@ -213,7 +214,7 @@
f.put_response(self, reason)
def write(self, frame, content = None):
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
frame.channel = self.id
self.outgoing.put(frame)
@@ -283,7 +284,7 @@
if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_sync()
self.completion.wait()
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return None
try:
@@ -293,7 +294,7 @@
else:
return Message(self, resp)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -328,7 +329,7 @@
elif frame.method.result:
if self.synchronous:
fr = future.get_response(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
return fr
else:
@@ -337,12 +338,12 @@
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_sync()
completed = self.completion.wait(timeout=10)
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
if not completed:
- self.close("Timed-out waiting for completion of %s" % frame)
+ self.closed("Timed-out waiting for completion of %s" % frame)
except QueueClosed, e:
- if self.closed:
+ if self._closed:
raise Closed(self.reason)
else:
raise e
@@ -399,7 +400,7 @@
self.sequence = Sequence(0) #issues ids for outgoing commands
self.command_id = -1 #last issued id
self.mark = -1 #commands up to this mark are known to be complete
- self.closed = False
+ self._closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
@@ -413,7 +414,7 @@
self.reset()
self.condition.acquire()
try:
- self.closed = True
+ self._closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -433,10 +434,10 @@
remaining = timeout
self.condition.acquire()
try:
- while not self.closed and point_of_interest > self.mark:
+ while not self._closed and point_of_interest > self.mark:
#print "waiting for %s, mark = %s [%s]" % (point_of_interest, self.mark, self)
self.condition.wait(remaining)
- if not self.closed and point_of_interest > self.mark and timeout:
+ if not self._closed and point_of_interest > self.mark and timeout:
if (start_time + timeout) < time(): break
else: remaining = timeout - (time() - start_time)
finally: