You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/16 21:26:13 UTC
svn commit: r519129 - in /incubator/qpid/trunk/qpid: ./ python/ python/qpid/
python/tests/
Author: aconway
Date: Fri Mar 16 13:26:11 2007
New Revision: 519129
URL: http://svn.apache.org/viewvc?view=rev&rev=519129
Log:
Merged revisions 496593 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line
0-9 request/response framing for python
........
Added:
incubator/qpid/trunk/qpid/python/hello-world
- copied unchanged from r496593, incubator/qpid/branches/qpid.0-9/python/hello-world
incubator/qpid/trunk/qpid/python/server
- copied unchanged from r496593, incubator/qpid/branches/qpid.0-9/python/server
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/python/amqp-doc
incubator/qpid/trunk/qpid/python/qpid/client.py
incubator/qpid/trunk/qpid/python/qpid/codec.py
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/delegate.py
incubator/qpid/trunk/qpid/python/qpid/message.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/spec.py
incubator/qpid/trunk/qpid/python/tests/example.py
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Mar 16 13:26:11 2007
@@ -1 +1 @@
-/incubator/qpid/branches/qpid.0-9:1-492620
+/incubator/qpid/branches/qpid.0-9:1-492620,496593
Modified: incubator/qpid/trunk/qpid/python/amqp-doc
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/amqp-doc?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/amqp-doc (original)
+++ incubator/qpid/trunk/qpid/python/amqp-doc Fri Mar 16 13:26:11 2007
@@ -42,7 +42,7 @@
die(str(e))
regexp = False
-spec = "../specs/amqp.0-8.xml"
+spec = "../specs/amqp.0-9.xml"
for k, v in opts:
if k == "-e" or k == "--regexp": regexp = True
if k == "-s" or k == "--spec": spec = v
Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Fri Mar 16 13:26:11 2007
@@ -25,7 +25,7 @@
import threading
from peer import Peer, Closed
from delegate import Delegate
-from connection import Connection, Frame
+from connection import Connection, Frame, connect
from spec import load
from queue import Queue
@@ -49,15 +49,13 @@
self.lock = threading.Lock()
self.closed = False
+ self.reason = None
self.started = threading.Event()
- self.conn = Connection(self.host, self.port, self.spec)
- self.peer = Peer(self.conn, ClientDelegate(self))
-
def wait(self):
self.started.wait()
if self.closed:
- raise EOFError()
+ raise Closed(self.reason)
def queue(self, key):
self.lock.acquire()
@@ -76,7 +74,9 @@
self.response = response
self.locale = locale
- self.conn.connect()
+ self.conn = Connection(connect(self.host, self.port), self.spec)
+ self.peer = Peer(self.conn, ClientDelegate(self))
+
self.conn.init()
self.peer.start()
self.wait()
@@ -92,12 +92,12 @@
self.client = client
def connection_start(self, ch, msg):
- ch.connection_start_ok(mechanism=self.client.mechanism,
- response=self.client.response,
- locale=self.client.locale)
+ msg.start_ok(mechanism=self.client.mechanism,
+ response=self.client.response,
+ locale=self.client.locale)
def connection_tune(self, ch, msg):
- ch.connection_tune_ok(*msg.fields)
+ msg.tune_ok(*msg.frame.args)
self.client.started.set()
def basic_deliver(self, ch, msg):
@@ -111,4 +111,5 @@
def close(self, reason):
self.client.closed = True
+ self.client.reason = reason
self.client.started.set()
Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Fri Mar 16 13:26:11 2007
@@ -185,6 +185,24 @@
result[key] = value
return result
+ def encode_timestamp(self, t):
+ # XXX
+ self.encode_longlong(t)
+
+ def decode_timestamp(self):
+ # XXX
+ return self.decode_longlong()
+
+ def encode_content(self, s):
+ # XXX
+ self.encode_octet(0)
+ self.encode_longstr(s)
+
+ def decode_content(self):
+ # XXX
+ self.decode_octet()
+ return self.decode_longstr()
+
def test(type, value):
if isinstance(value, (list, tuple)):
values = value
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Fri Mar 16 13:26:11 2007
@@ -25,7 +25,7 @@
import socket, codec,logging
from cStringIO import StringIO
-from spec import load, pythonize
+from spec import load
from codec import EOF
class SockIO:
@@ -53,19 +53,27 @@
def flush(self):
pass
+def connect(host, port):
+ sock = socket.socket()
+ sock.connect((host, port))
+ sock.setblocking(1)
+ return SockIO(sock)
+
+def listen(host, port, predicate = lambda: True):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ sock.listen(5)
+ while predicate():
+ s, a = sock.accept()
+ yield SockIO(s)
+
class Connection:
- def __init__(self, host, port, spec):
- self.host = host
- self.port = port
+ def __init__(self, io, spec):
+ self.codec = codec.Codec(io)
self.spec = spec
- self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
-
- def connect(self):
- sock = socket.socket()
- sock.connect((self.host, self.port))
- sock.setblocking(1)
- self.codec = codec.Codec(SockIO(sock))
+ self.FRAME_END = self.spec.constants.byname["frame_end"].id
def flush(self):
self.codec.flush()
@@ -76,53 +84,55 @@
self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
self.spec.minor)
+ def tini(self):
+ self.codec.unpack(Connection.INIT)
+
def write(self, frame):
c = self.codec
- c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
+ c.encode_octet(self.spec.constants.byname[frame.type].id)
c.encode_short(frame.channel)
- frame.payload.encode(c)
+ body = StringIO()
+ enc = codec.Codec(body)
+ frame.encode(enc)
+ enc.flush()
+ c.encode_longstr(body.getvalue())
c.encode_octet(self.FRAME_END)
def read(self):
c = self.codec
- type = pythonize(self.spec.constants.byid[c.decode_octet()].name)
+ type = self.spec.constants.byid[c.decode_octet()].name
channel = c.decode_short()
- payload = Frame.DECODERS[type].decode(self.spec, c)
+ body = c.decode_longstr()
+ dec = codec.Codec(StringIO(body))
+ frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+ frame.channel = channel
end = c.decode_octet()
if end != self.FRAME_END:
- raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
- frame = Frame(channel, payload)
+ garbage = ""
+ while end != self.FRAME_END:
+ garbage += chr(end)
+ end = c.decode_octet()
+ raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
return frame
class Frame:
- METHOD = "frame_method"
- HEADER = "frame_header"
- BODY = "frame_body"
- OOB_METHOD = "frame_oob_method"
- OOB_HEADER = "frame_oob_header"
- OOB_BODY = "frame_oob_body"
- TRACE = "frame_trace"
- HEARTBEAT = "frame_heartbeat"
-
DECODERS = {}
- def __init__(self, channel, payload):
- self.channel = channel
- self.payload = payload
-
- def __str__(self):
- return "[%d] %s" % (self.channel, self.payload)
-
-class Payload:
-
class __metaclass__(type):
def __new__(cls, name, bases, dict):
- for req in ("encode", "decode", "type"):
- if not dict.has_key(req):
- raise TypeError("%s must define %s" % (name, req))
+ for attr in ("encode", "decode", "type"):
+ if not dict.has_key(attr):
+ raise TypeError("%s must define %s" % (name, attr))
dict["decode"] = staticmethod(dict["decode"])
+ if dict.has_key("__init__"):
+ __init__ = dict["__init__"]
+ def init(self, *args, **kwargs):
+ args = list(args)
+ self.init(args, kwargs)
+ __init__(self, *args, **kwargs)
+ dict["__init__"] = init
t = type.__new__(cls, name, bases, dict)
if t.type != None:
Frame.DECODERS[t.type] = t
@@ -130,50 +140,100 @@
type = None
+ def init(self, args, kwargs):
+ self.channel = kwargs.pop("channel", 0)
+
def encode(self, enc): abstract
- def decode(spec, dec): abstract
+ def decode(spec, dec, size): abstract
-class Method(Payload):
+class Method(Frame):
- type = Frame.METHOD
+ type = "frame_method"
- def __init__(self, method, *args):
+ def __init__(self, method, args):
if len(args) != len(method.fields):
- argspec = ["%s: %s" % (pythonize(f.name), f.type)
+ argspec = ["%s: %s" % (f.name, f.type)
for f in method.fields]
raise TypeError("%s.%s expecting (%s), got %s" %
- (pythonize(method.klass.name),
- pythonize(method.name), ", ".join(argspec), args))
+ (method.klass.name, method.name, ", ".join(argspec),
+ args))
self.method = method
+ self.method_type = method
self.args = args
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
+ def encode(self, c):
c.encode_short(self.method.klass.id)
c.encode_short(self.method.id)
for field, arg in zip(self.method.fields, self.args):
c.encode(field.type, arg)
- c.flush()
- enc.encode_longstr(buf.getvalue())
- def decode(spec, dec):
- enc = dec.decode_longstr()
- c = codec.Codec(StringIO(enc))
+ def decode(spec, c, size):
klass = spec.classes.byid[c.decode_short()]
meth = klass.methods.byid[c.decode_short()]
args = tuple([c.decode(f.type) for f in meth.fields])
- return Method(meth, *args)
+ return Method(meth, args)
def __str__(self):
- return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+ return "[%s] %s %s" % (self.channel, self.method,
+ ", ".join([str(a) for a in self.args]))
-class Header(Payload):
+class Request(Frame):
- type = Frame.HEADER
+ type = "frame_request"
- def __init__(self, klass, weight, size, **properties):
+ def __init__(self, id, response_mark, method):
+ self.id = id
+ self.response_mark = response_mark
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.response_mark)
+ # reserved
+ enc.encode_long(0)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ mark = dec.decode_longlong()
+ # reserved
+ dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Request(id, mark, method)
+
+class Response(Frame):
+
+ type = "frame_response"
+
+ def __init__(self, id, request_id, batch_offset, method):
+ self.id = id
+ self.request_id = request_id
+ self.batch_offset = batch_offset
+ self.method = method
+ self.method_type = method.method_type
+ self.args = method.args
+
+ def encode(self, enc):
+ enc.encode_longlong(self.id)
+ enc.encode_longlong(self.request_id)
+ enc.encode_long(self.batch_offset)
+ self.method.encode(enc)
+
+ def decode(spec, dec, size):
+ id = dec.decode_longlong()
+ request_id = dec.decode_longlong()
+ batch_offset = dec.decode_long()
+ method = Method.decode(spec, dec, size - 20)
+ return Response(id, request_id, batch_offset, method)
+
+class Header(Frame):
+
+ type = "frame_header"
+
+ def __init__(self, klass, weight, size, properties):
self.klass = klass
self.weight = weight
self.size = size
@@ -188,9 +248,7 @@
def __delitem__(self, name):
del self.properties[name]
- def encode(self, enc):
- buf = StringIO()
- c = codec.Codec(buf)
+ def encode(self, c):
c.encode_short(self.klass.id)
c.encode_short(self.weight)
c.encode_longlong(self.size)
@@ -218,11 +276,8 @@
v = self.properties.get(f.name)
if v != None:
c.encode(f.type, v)
- c.flush()
- enc.encode_longstr(buf.getvalue())
- def decode(spec, dec):
- c = codec.Codec(StringIO(dec.decode_longstr()))
+ def decode(spec, c, size):
klass = spec.classes.byid[c.decode_short()]
weight = c.decode_short()
size = c.decode_longlong()
@@ -247,24 +302,31 @@
# plain '' strings can be used as keywords so we need to
# stringify the names.
properties[str(f.name)] = c.decode(f.type)
- return Header(klass, weight, size, **properties)
+ return Header(klass, weight, size, properties)
def __str__(self):
return "%s %s %s %s" % (self.klass, self.weight, self.size,
self.properties)
-class Body(Payload):
+class Body(Frame):
- type = Frame.BODY
+ type = "frame_body"
def __init__(self, content):
self.content = content
def encode(self, enc):
- enc.encode_longstr(self.content)
+ enc.write(self.content)
- def decode(spec, dec):
- return Body(dec.decode_longstr())
+ def decode(spec, dec, size):
+ return Body(dec.read(size))
def __str__(self):
return "Body(%r)" % self.content
+
+# TODO:
+# OOB_METHOD = "frame_oob_method"
+# OOB_HEADER = "frame_oob_header"
+# OOB_BODY = "frame_oob_body"
+# TRACE = "frame_trace"
+# HEARTBEAT = "frame_heartbeat"
Modified: incubator/qpid/trunk/qpid/python/qpid/delegate.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegate.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegate.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegate.py Fri Mar 16 13:26:11 2007
@@ -22,33 +22,25 @@
"""
import threading, inspect
-from spec import pythonize
+from connection import Method, Request, Response
class Delegate:
def __init__(self):
self.handlers = {}
self.invokers = {}
- # initialize all the mixins
- self.invoke_all("init")
- def invoke_all(self, meth, *args, **kwargs):
- for cls in inspect.getmro(self.__class__):
- if hasattr(cls, meth):
- getattr(cls, meth)(self, *args, **kwargs)
-
- def dispatch(self, channel, message):
- method = message.method
+ def __call__(self, channel, frame):
+ method = frame.method
try:
handler = self.handlers[method]
except KeyError:
- name = "%s_%s" % (pythonize(method.klass.name),
- pythonize(method.name))
+ name = "%s_%s" % (method.klass.name, method.name)
handler = getattr(self, name)
self.handlers[method] = handler
- return handler(channel, message)
+ return handler(channel, frame)
def close(self, reason):
- self.invoke_all("close", reason)
+ print "Connection closed: %s" % reason
Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Fri Mar 16 13:26:11 2007
@@ -16,22 +16,19 @@
# specific language governing permissions and limitations
# under the License.
#
+from connection import Method, Request
from sets import Set
class Message:
- COMMON_FIELDS = Set(("content", "method", "fields"))
-
- def __init__(self, method, fields, content = None):
- self.method = method
- self.fields = fields
+ def __init__(self, channel, frame, content = None):
+ self.channel = channel
+ self.frame = frame
+ self.method = frame.method_type
self.content = content
def __len__(self):
- l = len(self.fields)
- if self.method.content:
- l += 1
- return len(self.fields)
+ return len(self.frame.args)
def _idx(self, idx):
if idx < 0: idx += len(self)
@@ -40,45 +37,29 @@
return idx
def __getitem__(self, idx):
- idx = self._idx(idx)
- if idx == len(self.fields):
- return self.content
- else:
- return self.fields[idx]
-
- def __setitem__(self, idx, value):
- idx = self._idx(idx)
- if idx == len(self.fields):
- self.content = value
- else:
- self.fields[idx] = value
+ return self.frame.args[idx]
- def _slot(self, attr):
- if attr in Message.COMMON_FIELDS:
- env = self.__dict__
- key = attr
+ def __getattr__(self, attr):
+ fields = self.method.fields.byname
+ if fields.has_key(attr):
+ f = fields[attr]
+ result = self[self.method.fields.index(f)]
else:
- env = self.fields
- try:
- field = self.method.fields.bypyname[attr]
- key = self.method.fields.index(field)
- except KeyError:
+ for r in self.method.responses:
+ if attr == r.name:
+ result = lambda *args, **kwargs: \
+ self.channel.respond(Method(r, r.arguments(*args, **kwargs)),
+ self.frame)
+ break
+ else:
raise AttributeError(attr)
- return env, key
-
- def __getattr__(self, attr):
- env, key = self._slot(attr)
- return env[key]
-
- def __setattr__(self, attr, value):
- env, key = self._slot(attr)
- env[attr] = value
+ return result
STR = "%s %s content = %s"
REPR = STR.replace("%s", "%r")
def __str__(self):
- return Message.STR % (self.method, self.fields, self.content)
+ return Message.STR % (self.method, self.frame.args, self.content)
def __repr__(self):
- return Message.REPR % (self.method, self.fields, self.content)
+ return Message.REPR % (self.method, self.frame.args, self.content)
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Fri Mar 16 13:26:11 2007
@@ -24,13 +24,30 @@
incoming method frames to a delegate.
"""
-import thread, traceback, socket, sys, logging
-from connection import Frame, EOF, Method, Header, Body
+import thread, threading, traceback, socket, sys, logging
+from connection import EOF, Method, Header, Body, Request, Response
from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+class Sequence:
+
+ def __init__(self, start, step = 1):
+ # we should keep start for wrap around
+ self._next = start
+ self.step = step
+ self.lock = thread.allocate_lock()
+
+ def next(self):
+ self.lock.acquire()
+ try:
+ result = self._next
+ self._next += self.step
+ return result
+ finally:
+ self.lock.release()
+
class Peer:
def __init__(self, conn, delegate):
@@ -39,8 +56,6 @@
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
- self.Channel = type("Channel%s" % conn.spec.klass.__name__,
- (Channel, conn.spec.klass), {})
self.lock = thread.allocate_lock()
def channel(self, id):
@@ -49,7 +64,7 @@
try:
ch = self.channels[id]
except KeyError:
- ch = self.Channel(id, self.outgoing)
+ ch = Channel(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
finally:
self.lock.release()
@@ -64,7 +79,7 @@
"""Call when an unexpected exception occurs that will kill a thread."""
if message: print >> sys.stderr, message
self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
-
+
def reader(self):
try:
while True:
@@ -74,7 +89,7 @@
self.work.close()
break
ch = self.channel(frame.channel)
- ch.dispatch(frame, self.work)
+ ch.receive(frame, self.work)
except:
self.fatal()
@@ -99,37 +114,70 @@
def worker(self):
try:
while True:
- self.dispatch(self.work.get())
- except QueueClosed, e:
- self.close(e)
+ queue = self.work.get()
+ frame = queue.get()
+ channel = self.channel(frame.channel)
+ if frame.method_type.content:
+ content = read_content(queue)
+ else:
+ content = None
+
+ self.delegate(channel, Message(channel, frame, content))
except:
self.fatal()
- def dispatch(self, queue):
- frame = queue.get()
- channel = self.channel(frame.channel)
- payload = frame.payload
- if payload.method.content:
- content = read_content(queue)
+class Requester:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+ self.mark = 0
+ # request_id -> listener
+ self.outstanding = {}
+
+ def request(self, method, listener, content = None):
+ frame = Request(self.sequence.next(), self.mark, method)
+ self.outstanding[frame.id] = listener
+ self.write(frame, content)
+
+ def receive(self, channel, frame):
+ listener = self.outstanding.pop(frame.id)
+ listener(channel, frame)
+
+class Responder:
+
+ def __init__(self, writer):
+ self.write = writer
+ self.sequence = Sequence(1)
+
+ def respond(self, method, request):
+ if isinstance(request, Method):
+ self.write(method)
else:
- content = None
- # Let the caller deal with exceptions thrown here.
- message = Message(payload.method, payload.args, content)
- self.delegate.dispatch(channel, message)
+ # XXX: batching
+ frame = Response(self.sequence.next(), request.id, 0, method)
+ self.write(frame)
class Closed(Exception): pass
class Channel:
- def __init__(self, id, outgoing):
+ def __init__(self, id, outgoing, spec):
self.id = id
self.outgoing = outgoing
+ self.spec = spec
self.incoming = Queue(0)
self.responses = Queue(0)
self.queue = None
self.closed = False
self.reason = None
+ self.requester = Requester(self.write)
+ self.responder = Responder(self.write)
+
+ # XXX: better switch
+ self.reliable = False
+
def close(self, reason):
if self.closed:
return
@@ -138,43 +186,87 @@
self.incoming.close()
self.responses.close()
- def dispatch(self, frame, work):
- payload = frame.payload
- if isinstance(payload, Method):
- if payload.method.response:
+ def write(self, frame, content = None):
+ if self.closed:
+ raise Closed(self.reason)
+ frame.channel = self.id
+ self.outgoing.put(frame)
+ if (isinstance(frame, (Method, Request))
+ and content == None
+ and frame.method_type.content):
+ content = Content()
+ if content != None:
+ self.write_content(frame.method_type.klass, content)
+
+ def write_content(self, klass, content):
+ size = content.size()
+ header = Header(klass, content.weight(), size, content.properties)
+ self.write(header)
+ for child in content.children:
+ self.write_content(klass, child)
+ # should split up if content.body exceeds max frame size
+ if size > 0:
+ self.write(Body(content.body))
+
+ def receive(self, frame, work):
+ if isinstance(frame, Method):
+ if frame.method.response:
self.queue = self.responses
else:
self.queue = self.incoming
work.put(self.incoming)
+ elif isinstance(frame, Request):
+ self.queue = self.incoming
+ work.put(self.incoming)
+ elif isinstance(frame, Response):
+ self.requester.receive(self, frame)
+ return
self.queue.put(frame)
- def invoke(self, method, args, content = None):
- if self.closed:
- raise Closed(self.reason)
- frame = Frame(self.id, Method(method, *args))
- self.outgoing.put(frame)
+ def queue_response(self, channel, frame):
+ channel.responses.put(frame.method)
+
+ def request(self, method, listener, content = None):
+ self.requester.request(method, listener, content)
- if method.content:
- if content == None:
- content = Content()
- self.write_content(method.klass, content, self.outgoing)
+ def respond(self, method, request):
+ self.responder.respond(method, request)
+
+ def invoke(self, type, args, kwargs):
+ content = kwargs.pop("content", None)
+ frame = Method(type, type.arguments(*args, **kwargs))
+ if self.reliable:
+ self.request(frame, self.queue_response, content)
+ try:
+ resp = self.responses.get()
+ return Message(self, resp)
+ except QueueClosed, e:
+ if self.closed:
+ raise Closed(self.reason)
+ else:
+ raise e
+ else:
+ return self.invoke_method(frame, content)
+
+ def invoke_method(self, frame, content = None):
+ self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
- f = method.fields.byname["nowait"]
- nowait = args[method.fields.index(f)]
+ f = frame.method.fields.byname["nowait"]
+ nowait = frame.args[frame.method.fields.index(f)]
except KeyError:
nowait = False
try:
- if not nowait and method.responses:
- resp = self.responses.get().payload
+ if not nowait and frame.method.responses:
+ resp = self.responses.get()
if resp.method.content:
content = read_content(self.responses)
else:
content = None
- if resp.method in method.responses:
- return Message(resp.method, resp.args, content)
+ if resp.method in frame.method.responses:
+ return Message(self, resp, content)
else:
raise ValueError(resp)
except QueueClosed, e:
@@ -183,19 +275,15 @@
else:
raise e
- def write_content(self, klass, content, queue):
- size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
- queue.put(header)
- for child in content.children:
- self.write_content(klass, child, queue)
- # should split up if content.body exceeds max frame size
- if size > 0:
- queue.put(Frame(self.id, Body(content.body)))
+ def __getattr__(self, name):
+ type = self.spec.method(name)
+ if type == None: raise AttributeError(name)
+ method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
+ self.__dict__[name] = method
+ return method
def read_content(queue):
- frame = queue.get()
- header = frame.payload
+ header = queue.get()
children = []
for i in range(header.weight):
children.append(read_content(queue))
@@ -204,7 +292,7 @@
buf = StringIO()
while read < size:
body = queue.get()
- content = body.payload.content
+ content = body.content
buf.write(content)
read += len(content)
return Content(buf.getvalue(), children, header.properties.copy())
Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Fri Mar 16 13:26:11 2007
@@ -38,21 +38,16 @@
self.byname = {}
self.byid = {}
self.indexes = {}
- self.bypyname = {}
def add(self, item):
if self.byname.has_key(item.name):
raise ValueError("duplicate name: %s" % item)
if self.byid.has_key(item.id):
raise ValueError("duplicate id: %s" % item)
- pyname = pythonize(item.name)
- if self.bypyname.has_key(pyname):
- raise ValueError("duplicate pyname: %s" % item)
self.indexes[item] = len(self.items)
self.items.append(item)
self.byname[item.name] = item
self.byid[item.id] = item
- self.bypyname[pyname] = item
def index(self, item):
try:
@@ -91,11 +86,23 @@
self.file = file
self.constants = SpecContainer()
self.classes = SpecContainer()
+ # methods indexed by classname_methname
+ self.methods = {}
def post_load(self):
self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
+ def method(self, name):
+ if not self.methods.has_key(name):
+ for cls in self.classes:
+ clen = len(cls.name)
+ if name.startswith(cls.name) and name[clen] == "_":
+ end = name[clen + 1:]
+ if cls.methods.byname.has_key(end):
+ self.methods[name] = cls.methods.byname[end]
+ return self.methods.get(name)
+
def parse_method(self, name):
parts = re.split(r"\s*\.\s*", name)
if len(parts) != 2:
@@ -107,17 +114,16 @@
module = new.module(name, doc)
module.__file__ = self.file
for c in self.classes:
- classname = pythonize(c.name)
- cls = c.define_class(classname)
+ cls = c.define_class(c.name)
cls.__module__ = module.__name__
- setattr(module, classname, cls)
+ setattr(module, c.name, cls)
return module
def define_class(self, name):
methods = {}
for c in self.classes:
for m in c.methods:
- meth = pythonize(m.klass.name + "_" + m.name)
+ meth = m.klass.name + "_" + m.name
methods[meth] = m.define_method(meth)
return type(name, (), methods)
@@ -150,8 +156,7 @@
def define_class(self, name):
methods = {}
for m in self.methods:
- meth = pythonize(m.name)
- methods[meth] = m.define_method(meth)
+ methods[m.name] = m.define_method(m.name)
return type(name, (), methods)
class Method(Metadata):
@@ -172,11 +177,35 @@
self.docs = docs
self.response = False
+ def arguments(self, *args, **kwargs):
+ nargs = len(args) + len(kwargs)
+ maxargs = len(self.fields)
+ if nargs > maxargs:
+ self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
+ result = []
+ for f in self.fields:
+ idx = self.fields.index(f)
+ if idx < len(args):
+ result.append(args[idx])
+ elif kwargs.has_key(f.name):
+ result.append(kwargs.pop(f.name))
+ else:
+ result.append(Method.DEFAULTS[f.type])
+ for key, value in kwargs.items():
+ if self.fields.byname.has_key(key):
+ self._type_error("got multiple values for keyword argument '%s'", key)
+ else:
+ self._type_error("got an unexpected keyword argument '%s'", key)
+ return tuple(result)
+
+ def _type_error(self, msg, *args):
+ raise TypeError("%s %s" % (self.name, msg % args))
+
def docstring(self):
s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
for f in self.fields:
if f.docs:
- s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] +
+ s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
[fill(d, 4) for d in f.docs[1:]])
return s
@@ -195,16 +224,13 @@
def define_method(self, name):
g = {Method.METHOD: self}
l = {}
- args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields]
+ args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
+ methargs = args[:]
if self.content:
args += [("content", None)]
code = "def %s(self, %s):\n" % \
(name, ", ".join(["%s = %r" % a for a in args]))
code += " %r\n" % self.docstring()
- if self.content:
- methargs = args[:-1]
- else:
- methargs = args
argnames = ", ".join([a[0] for a in methargs])
code += " return self.invoke(%s" % Method.METHOD
if argnames:
@@ -239,7 +265,7 @@
type = f_nd["@type"]
while domains.has_key(type) and domains[type] != type:
type = domains[type]
- l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd)))
+ l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, get_docs(f_nd)))
def load(specfile):
doc = xmlutil.parse(specfile)
@@ -248,8 +274,8 @@
# constants
for nd in root["constant"]:
- const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"),
- get_docs(nd))
+ const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]),
+ nd.get("@class"), get_docs(nd))
spec.constants.add(const)
# domains are typedefs
@@ -259,14 +285,14 @@
# classes
for c_nd in root["class"]:
- klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"],
- get_docs(c_nd))
+ klass = Class(spec, pythonize(c_nd["@name"]), int(c_nd["@index"]),
+ c_nd["@handler"], get_docs(c_nd))
load_fields(c_nd, klass.fields, domains)
for m_nd in c_nd["method"]:
- meth = Method(klass, m_nd["@name"],
+ meth = Method(klass, pythonize(m_nd["@name"]),
int(m_nd["@index"]),
m_nd.get_bool("@content", False),
- [nd["@name"] for nd in m_nd["response"]],
+ [pythonize(nd["@name"]) for nd in m_nd["response"]],
m_nd.get_bool("@synchronous", False),
m_nd.text,
get_docs(m_nd))
Modified: incubator/qpid/trunk/qpid/python/tests/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/example.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/example.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/example.py Fri Mar 16 13:26:11 2007
@@ -58,7 +58,7 @@
# Here we use ordinal arguments.
self.exchange_declare(channel, 0, "test", "direct")
-
+
# Here we use keyword arguments.
self.queue_declare(channel, queue="test-queue")
channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
@@ -85,7 +85,7 @@
# argument in case the server hangs. By default queue.get() will wait
# until a message arrives or the connection to the server dies.
msg = queue.get(timeout=10)
-
+
# And check that we got the right response with assertEqual
self.assertEqual(body, msg.content.body)