You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/16 21:26:13 UTC

svn commit: r519129 - in /incubator/qpid/trunk/qpid: ./ python/ python/qpid/ python/tests/

Author: aconway
Date: Fri Mar 16 13:26:11 2007
New Revision: 519129

URL: http://svn.apache.org/viewvc?view=rev&rev=519129
Log:
Merged revisions 496593 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9

........
  r496593 | rhs | 2007-01-16 00:28:25 -0500 (Tue, 16 Jan 2007) | 1 line
  
  0-9 request/response framing for python
........

Added:
    incubator/qpid/trunk/qpid/python/hello-world
      - copied unchanged from r496593, incubator/qpid/branches/qpid.0-9/python/hello-world
    incubator/qpid/trunk/qpid/python/server
      - copied unchanged from r496593, incubator/qpid/branches/qpid.0-9/python/server
Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/python/amqp-doc
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/codec.py
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/delegate.py
    incubator/qpid/trunk/qpid/python/qpid/message.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/spec.py
    incubator/qpid/trunk/qpid/python/tests/example.py

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Mar 16 13:26:11 2007
@@ -1 +1 @@
-/incubator/qpid/branches/qpid.0-9:1-492620
+/incubator/qpid/branches/qpid.0-9:1-492620,496593

Modified: incubator/qpid/trunk/qpid/python/amqp-doc
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/amqp-doc?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/amqp-doc (original)
+++ incubator/qpid/trunk/qpid/python/amqp-doc Fri Mar 16 13:26:11 2007
@@ -42,7 +42,7 @@
   die(str(e))
 
 regexp = False
-spec = "../specs/amqp.0-8.xml"
+spec = "../specs/amqp.0-9.xml"
 for k, v in opts:
   if k == "-e" or k == "--regexp": regexp = True
   if k == "-s" or k == "--spec": spec = v

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Fri Mar 16 13:26:11 2007
@@ -25,7 +25,7 @@
 import threading
 from peer import Peer, Closed
 from delegate import Delegate
-from connection import Connection, Frame
+from connection import Connection, Frame, connect
 from spec import load
 from queue import Queue
 
@@ -49,15 +49,13 @@
     self.lock = threading.Lock()
 
     self.closed = False
+    self.reason = None
     self.started = threading.Event()
 
-    self.conn = Connection(self.host, self.port, self.spec)
-    self.peer = Peer(self.conn, ClientDelegate(self))
-
   def wait(self):
     self.started.wait()
     if self.closed:
-      raise EOFError()
+      raise Closed(self.reason)
 
   def queue(self, key):
     self.lock.acquire()
@@ -76,7 +74,9 @@
     self.response = response
     self.locale = locale
 
-    self.conn.connect()
+    self.conn = Connection(connect(self.host, self.port), self.spec)
+    self.peer = Peer(self.conn, ClientDelegate(self))
+
     self.conn.init()
     self.peer.start()
     self.wait()
@@ -92,12 +92,12 @@
     self.client = client
 
   def connection_start(self, ch, msg):
-    ch.connection_start_ok(mechanism=self.client.mechanism,
-                           response=self.client.response,
-                           locale=self.client.locale)
+    msg.start_ok(mechanism=self.client.mechanism,
+                 response=self.client.response,
+                 locale=self.client.locale)
 
   def connection_tune(self, ch, msg):
-    ch.connection_tune_ok(*msg.fields)
+    msg.tune_ok(*msg.frame.args)
     self.client.started.set()
 
   def basic_deliver(self, ch, msg):
@@ -111,4 +111,5 @@
 
   def close(self, reason):
     self.client.closed = True
+    self.client.reason = reason
     self.client.started.set()

Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Fri Mar 16 13:26:11 2007
@@ -185,6 +185,24 @@
       result[key] = value
     return result
 
+  def encode_timestamp(self, t):
+    # XXX
+    self.encode_longlong(t)
+
+  def decode_timestamp(self):
+    # XXX
+    return self.decode_longlong()
+
+  def encode_content(self, s):
+    # XXX
+    self.encode_octet(0)
+    self.encode_longstr(s)
+
+  def decode_content(self):
+    # XXX
+    self.decode_octet()
+    return self.decode_longstr()
+
 def test(type, value):
   if isinstance(value, (list, tuple)):
     values = value

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Fri Mar 16 13:26:11 2007
@@ -25,7 +25,7 @@
 
 import socket, codec,logging
 from cStringIO import StringIO
-from spec import load, pythonize
+from spec import load
 from codec import EOF
 
 class SockIO:
@@ -53,19 +53,27 @@
   def flush(self):
     pass
 
+def connect(host, port):
+  sock = socket.socket()
+  sock.connect((host, port))
+  sock.setblocking(1)
+  return SockIO(sock)
+
+def listen(host, port, predicate = lambda: True):
+  sock = socket.socket()
+  sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+  sock.bind((host, port))
+  sock.listen(5)
+  while predicate():
+    s, a = sock.accept()
+    yield SockIO(s)
+
 class Connection:
 
-  def __init__(self, host, port, spec):
-    self.host = host
-    self.port = port
+  def __init__(self, io, spec):
+    self.codec = codec.Codec(io)
     self.spec = spec
-    self.FRAME_END = self.spec.constants.bypyname["frame_end"].id
-
-  def connect(self):
-    sock = socket.socket()
-    sock.connect((self.host, self.port))
-    sock.setblocking(1)
-    self.codec = codec.Codec(SockIO(sock))
+    self.FRAME_END = self.spec.constants.byname["frame_end"].id
 
   def flush(self):
     self.codec.flush()
@@ -76,53 +84,55 @@
     self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major,
                     self.spec.minor)
 
+  def tini(self):
+    self.codec.unpack(Connection.INIT)
+
   def write(self, frame):
     c = self.codec
-    c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
+    c.encode_octet(self.spec.constants.byname[frame.type].id)
     c.encode_short(frame.channel)
-    frame.payload.encode(c)
+    body = StringIO()
+    enc = codec.Codec(body)
+    frame.encode(enc)
+    enc.flush()
+    c.encode_longstr(body.getvalue())
     c.encode_octet(self.FRAME_END)
 
   def read(self):
     c = self.codec
-    type = pythonize(self.spec.constants.byid[c.decode_octet()].name)
+    type = self.spec.constants.byid[c.decode_octet()].name
     channel = c.decode_short()
-    payload = Frame.DECODERS[type].decode(self.spec, c)
+    body = c.decode_longstr()
+    dec = codec.Codec(StringIO(body))
+    frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+    frame.channel = channel
     end = c.decode_octet()
     if end != self.FRAME_END:
-      raise "frame error: expected %r, got %r" % (self.FRAME_END, end)
-    frame = Frame(channel, payload)
+      garbage = ""
+      while end != self.FRAME_END:
+        garbage += chr(end)
+        end = c.decode_octet()
+      raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
     return frame
 
 class Frame:
 
-  METHOD = "frame_method"
-  HEADER = "frame_header"
-  BODY = "frame_body"
-  OOB_METHOD = "frame_oob_method"
-  OOB_HEADER = "frame_oob_header"
-  OOB_BODY = "frame_oob_body"
-  TRACE = "frame_trace"
-  HEARTBEAT = "frame_heartbeat"
-
   DECODERS = {}
 
-  def __init__(self, channel, payload):
-    self.channel = channel
-    self.payload = payload
-
-  def __str__(self):
-    return "[%d] %s" % (self.channel, self.payload)
-
-class Payload:
-
   class __metaclass__(type):
 
     def __new__(cls, name, bases, dict):
-      for req in ("encode", "decode", "type"):
-        if not dict.has_key(req):
-          raise TypeError("%s must define %s" % (name, req))
+      for attr in ("encode", "decode", "type"):
+        if not dict.has_key(attr):
+          raise TypeError("%s must define %s" % (name, attr))
       dict["decode"] = staticmethod(dict["decode"])
+      if dict.has_key("__init__"):
+        __init__ = dict["__init__"]
+        def init(self, *args, **kwargs):
+          args = list(args)
+          self.init(args, kwargs)
+          __init__(self, *args, **kwargs)
+        dict["__init__"] = init
       t = type.__new__(cls, name, bases, dict)
       if t.type != None:
         Frame.DECODERS[t.type] = t
@@ -130,50 +140,100 @@
 
   type = None
 
+  def init(self, args, kwargs):
+    self.channel = kwargs.pop("channel", 0)
+
   def encode(self, enc): abstract
 
-  def decode(spec, dec): abstract
+  def decode(spec, dec, size): abstract
 
-class Method(Payload):
+class Method(Frame):
 
-  type = Frame.METHOD
+  type = "frame_method"
 
-  def __init__(self, method, *args):
+  def __init__(self, method, args):
     if len(args) != len(method.fields):
-      argspec = ["%s: %s" % (pythonize(f.name), f.type)
+      argspec = ["%s: %s" % (f.name, f.type)
                  for f in method.fields]
       raise TypeError("%s.%s expecting (%s), got %s" %
-                      (pythonize(method.klass.name),
-                       pythonize(method.name), ", ".join(argspec), args))
+                      (method.klass.name, method.name, ", ".join(argspec),
+                       args))
     self.method = method
+    self.method_type = method
     self.args = args
 
-  def encode(self, enc):
-    buf = StringIO()
-    c = codec.Codec(buf)
+  def encode(self, c):
     c.encode_short(self.method.klass.id)
     c.encode_short(self.method.id)
     for field, arg in zip(self.method.fields, self.args):
       c.encode(field.type, arg)
-    c.flush()
-    enc.encode_longstr(buf.getvalue())
 
-  def decode(spec, dec):
-    enc = dec.decode_longstr()
-    c = codec.Codec(StringIO(enc))
+  def decode(spec, c, size):
     klass = spec.classes.byid[c.decode_short()]
     meth = klass.methods.byid[c.decode_short()]
     args = tuple([c.decode(f.type) for f in meth.fields])
-    return Method(meth, *args)
+    return Method(meth, args)
 
   def __str__(self):
-    return "%s %s" % (self.method, ", ".join([str(a) for a in self.args]))
+    return "[%s] %s %s" % (self.channel, self.method,
+                           ", ".join([str(a) for a in self.args]))
 
-class Header(Payload):
+class Request(Frame):
 
-  type = Frame.HEADER
+  type = "frame_request"
 
-  def __init__(self, klass, weight, size, **properties):
+  def __init__(self, id, response_mark, method):
+    self.id = id
+    self.response_mark = response_mark
+    self.method = method
+    self.method_type = method.method_type
+    self.args = method.args
+
+  def encode(self, enc):
+    enc.encode_longlong(self.id)
+    enc.encode_longlong(self.response_mark)
+    # reserved
+    enc.encode_long(0)
+    self.method.encode(enc)
+
+  def decode(spec, dec, size):
+    id = dec.decode_longlong()
+    mark = dec.decode_longlong()
+    # reserved
+    dec.decode_long()
+    method = Method.decode(spec, dec, size - 20)
+    return Request(id, mark, method)
+
+class Response(Frame):
+
+  type = "frame_response"
+
+  def __init__(self, id, request_id, batch_offset, method):
+    self.id = id
+    self.request_id = request_id
+    self.batch_offset = batch_offset
+    self.method = method
+    self.method_type = method.method_type
+    self.args = method.args
+
+  def encode(self, enc):
+    enc.encode_longlong(self.id)
+    enc.encode_longlong(self.request_id)
+    enc.encode_long(self.batch_offset)
+    self.method.encode(enc)
+
+  def decode(spec, dec, size):
+    id = dec.decode_longlong()
+    request_id = dec.decode_longlong()
+    batch_offset = dec.decode_long()
+    method = Method.decode(spec, dec, size - 20)
+    return Response(id, request_id, batch_offset, method)
+
+class Header(Frame):
+
+  type = "frame_header"
+
+  def __init__(self, klass, weight, size, properties):
     self.klass = klass
     self.weight = weight
     self.size = size
@@ -188,9 +248,7 @@
   def __delitem__(self, name):
     del self.properties[name]
 
-  def encode(self, enc):
-    buf = StringIO()
-    c = codec.Codec(buf)
+  def encode(self, c):
     c.encode_short(self.klass.id)
     c.encode_short(self.weight)
     c.encode_longlong(self.size)
@@ -218,11 +276,8 @@
       v = self.properties.get(f.name)
       if v != None:
         c.encode(f.type, v)
-    c.flush()
-    enc.encode_longstr(buf.getvalue())
 
-  def decode(spec, dec):
-    c = codec.Codec(StringIO(dec.decode_longstr()))
+  def decode(spec, c, size):
     klass = spec.classes.byid[c.decode_short()]
     weight = c.decode_short()
     size = c.decode_longlong()
@@ -247,24 +302,31 @@
         # plain '' strings can be used as keywords so we need to
         # stringify the names.
         properties[str(f.name)] = c.decode(f.type)
-    return Header(klass, weight, size, **properties)
+    return Header(klass, weight, size, properties)
 
   def __str__(self):
     return "%s %s %s %s" % (self.klass, self.weight, self.size,
                             self.properties)
 
-class Body(Payload):
+class Body(Frame):
 
-  type = Frame.BODY
+  type = "frame_body"
 
   def __init__(self, content):
     self.content = content
 
   def encode(self, enc):
-    enc.encode_longstr(self.content)
+    enc.write(self.content)
 
-  def decode(spec, dec):
-    return Body(dec.decode_longstr())
+  def decode(spec, dec, size):
+    return Body(dec.read(size))
 
   def __str__(self):
     return "Body(%r)" % self.content
+
+# TODO:
+#  OOB_METHOD = "frame_oob_method"
+#  OOB_HEADER = "frame_oob_header"
+#  OOB_BODY = "frame_oob_body"
+#  TRACE = "frame_trace"
+#  HEARTBEAT = "frame_heartbeat"

Modified: incubator/qpid/trunk/qpid/python/qpid/delegate.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegate.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegate.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegate.py Fri Mar 16 13:26:11 2007
@@ -22,33 +22,25 @@
 """
 
 import threading, inspect
-from spec import pythonize
+from connection import Method, Request, Response
 
 class Delegate:
 
   def __init__(self):
     self.handlers = {}
     self.invokers = {}
-    # initialize all the mixins
-    self.invoke_all("init")
 
-  def invoke_all(self, meth, *args, **kwargs):
-    for cls in inspect.getmro(self.__class__):
-      if hasattr(cls, meth):
-        getattr(cls, meth)(self, *args, **kwargs)
-
-  def dispatch(self, channel, message):
-    method = message.method
+  def __call__(self, channel, frame):
+    method = frame.method
 
     try:
       handler = self.handlers[method]
     except KeyError:
-      name = "%s_%s" % (pythonize(method.klass.name),
-                        pythonize(method.name))
+      name = "%s_%s" % (method.klass.name, method.name)
       handler = getattr(self, name)
       self.handlers[method] = handler
 
-    return handler(channel, message)
+    return handler(channel, frame)
 
   def close(self, reason):
-    self.invoke_all("close", reason)
+    print "Connection closed: %s" % reason

Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Fri Mar 16 13:26:11 2007
@@ -16,22 +16,19 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from connection import Method, Request
 from sets import Set
 
 class Message:
 
-  COMMON_FIELDS = Set(("content", "method", "fields"))
-
-  def __init__(self, method, fields, content = None):
-    self.method = method
-    self.fields = fields
+  def __init__(self, channel, frame, content = None):
+    self.channel = channel
+    self.frame = frame
+    self.method = frame.method_type
     self.content = content
 
   def __len__(self):
-    l = len(self.fields)
-    if self.method.content:
-      l += 1
-    return len(self.fields)
+    return len(self.frame.args)
 
   def _idx(self, idx):
     if idx < 0: idx += len(self)
@@ -40,45 +37,29 @@
     return idx
 
   def __getitem__(self, idx):
-    idx = self._idx(idx)
-    if idx == len(self.fields):
-      return self.content
-    else:
-      return self.fields[idx]
-
-  def __setitem__(self, idx, value):
-    idx = self._idx(idx)
-    if idx == len(self.fields):
-      self.content = value
-    else:
-      self.fields[idx] = value
+    return self.frame.args[idx]
 
-  def _slot(self, attr):
-    if attr in Message.COMMON_FIELDS:
-      env = self.__dict__
-      key = attr
+  def __getattr__(self, attr):
+    fields = self.method.fields.byname
+    if fields.has_key(attr):
+      f = fields[attr]
+      result = self[self.method.fields.index(f)]
     else:
-      env = self.fields
-      try:
-        field = self.method.fields.bypyname[attr]
-        key = self.method.fields.index(field)
-      except KeyError:
+      for r in self.method.responses:
+        if attr == r.name:
+          result = lambda *args, **kwargs: \
+                   self.channel.respond(Method(r, r.arguments(*args, **kwargs)),
+                                        self.frame)
+          break
+      else:
         raise AttributeError(attr)
-    return env, key
-
-  def __getattr__(self, attr):
-    env, key = self._slot(attr)
-    return env[key]
-
-  def __setattr__(self, attr, value):
-    env, key = self._slot(attr)
-    env[attr] = value
+    return result
 
   STR = "%s %s content = %s"
   REPR = STR.replace("%s", "%r")
 
   def __str__(self):
-    return Message.STR % (self.method, self.fields, self.content)
+    return Message.STR % (self.method, self.frame.args, self.content)
 
   def __repr__(self):
-    return Message.REPR % (self.method, self.fields, self.content)
+    return Message.REPR % (self.method, self.frame.args, self.content)

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Fri Mar 16 13:26:11 2007
@@ -24,13 +24,30 @@
 incoming method frames to a delegate.
 """
 
-import thread, traceback, socket, sys, logging
-from connection import Frame, EOF, Method, Header, Body
+import thread, threading, traceback, socket, sys, logging
+from connection import EOF, Method, Header, Body, Request, Response
 from message import Message
 from queue import Queue, Closed as QueueClosed
 from content import Content
 from cStringIO import StringIO
 
+class Sequence:
+
+  def __init__(self, start, step = 1):
+    # we should keep start for wrap around
+    self._next = start
+    self.step = step
+    self.lock = thread.allocate_lock()
+
+  def next(self):
+    self.lock.acquire()
+    try:
+      result = self._next
+      self._next += self.step
+      return result
+    finally:
+      self.lock.release()
+
 class Peer:
 
   def __init__(self, conn, delegate):
@@ -39,8 +56,6 @@
     self.outgoing = Queue(0)
     self.work = Queue(0)
     self.channels = {}
-    self.Channel = type("Channel%s" % conn.spec.klass.__name__,
-                        (Channel, conn.spec.klass), {})
     self.lock = thread.allocate_lock()
 
   def channel(self, id):
@@ -49,7 +64,7 @@
       try:
         ch = self.channels[id]
       except KeyError:
-        ch = self.Channel(id, self.outgoing)
+        ch = Channel(id, self.outgoing, self.conn.spec)
         self.channels[id] = ch
     finally:
       self.lock.release()
@@ -64,7 +79,7 @@
     """Call when an unexpected exception occurs that will kill a thread."""
     if message: print >> sys.stderr, message
     self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc()))
-    
+
   def reader(self):
     try:
       while True:
@@ -74,7 +89,7 @@
           self.work.close()
           break
         ch = self.channel(frame.channel)
-        ch.dispatch(frame, self.work)
+        ch.receive(frame, self.work)
     except:
       self.fatal()
 
@@ -99,37 +114,70 @@
   def worker(self):
     try:
       while True:
-        self.dispatch(self.work.get())
-    except QueueClosed, e:
-      self.close(e)
+        queue = self.work.get()
+        frame = queue.get()
+        channel = self.channel(frame.channel)
+        if frame.method_type.content:
+          content = read_content(queue)
+        else:
+          content = None
+
+        self.delegate(channel, Message(channel, frame, content))
     except:
       self.fatal()
 
-  def dispatch(self, queue):
-    frame = queue.get()
-    channel = self.channel(frame.channel)
-    payload = frame.payload
-    if payload.method.content:
-      content = read_content(queue)
+class Requester:
+
+  def __init__(self, writer):
+    self.write = writer
+    self.sequence = Sequence(1)
+    self.mark = 0
+    # request_id -> listener
+    self.outstanding = {}
+
+  def request(self, method, listener, content = None):
+    frame = Request(self.sequence.next(), self.mark, method)
+    self.outstanding[frame.id] = listener
+    self.write(frame, content)
+
+  def receive(self, channel, frame):
+    listener = self.outstanding.pop(frame.id)
+    listener(channel, frame)
+
+class Responder:
+
+  def __init__(self, writer):
+    self.write = writer
+    self.sequence = Sequence(1)
+
+  def respond(self, method, request):
+    if isinstance(request, Method):
+      self.write(method)
     else:
-      content = None
-    # Let the caller deal with exceptions thrown here.
-    message = Message(payload.method, payload.args, content)
-    self.delegate.dispatch(channel, message)
+      # XXX: batching
+      frame = Response(self.sequence.next(), request.id, 0, method)
+      self.write(frame)
 
 class Closed(Exception): pass
 
 class Channel:
 
-  def __init__(self, id, outgoing):
+  def __init__(self, id, outgoing, spec):
     self.id = id
     self.outgoing = outgoing
+    self.spec = spec
     self.incoming = Queue(0)
     self.responses = Queue(0)
     self.queue = None
     self.closed = False
     self.reason = None
 
+    self.requester = Requester(self.write)
+    self.responder = Responder(self.write)
+
+    # XXX: better switch
+    self.reliable = False
+
   def close(self, reason):
     if self.closed:
       return
@@ -138,43 +186,87 @@
     self.incoming.close()
     self.responses.close()
 
-  def dispatch(self, frame, work):
-    payload = frame.payload
-    if isinstance(payload, Method):
-      if payload.method.response:
+  def write(self, frame, content = None):
+    if self.closed:
+      raise Closed(self.reason)
+    frame.channel = self.id
+    self.outgoing.put(frame)
+    if (isinstance(frame, (Method, Request))
+        and content == None
+        and frame.method_type.content):
+      content = Content()
+    if content != None:
+      self.write_content(frame.method_type.klass, content)
+
+  def write_content(self, klass, content):
+    size = content.size()
+    header = Header(klass, content.weight(), size, content.properties)
+    self.write(header)
+    for child in content.children:
+      self.write_content(klass, child)
+    # should split up if content.body exceeds max frame size
+    if size > 0:
+      self.write(Body(content.body))
+
+  def receive(self, frame, work):
+    if isinstance(frame, Method):
+      if frame.method.response:
         self.queue = self.responses
       else:
         self.queue = self.incoming
         work.put(self.incoming)
+    elif isinstance(frame, Request):
+      self.queue = self.incoming
+      work.put(self.incoming)
+    elif isinstance(frame, Response):
+      self.requester.receive(self, frame)
+      return
     self.queue.put(frame)
 
-  def invoke(self, method, args, content = None):
-    if self.closed:
-      raise Closed(self.reason)
-    frame = Frame(self.id, Method(method, *args))
-    self.outgoing.put(frame)
+  def queue_response(self, channel, frame):
+    channel.responses.put(frame.method)
+
+  def request(self, method, listener, content = None):
+    self.requester.request(method, listener, content)
 
-    if method.content:
-      if content == None:
-        content = Content()
-      self.write_content(method.klass, content, self.outgoing)
+  def respond(self, method, request):
+    self.responder.respond(method, request)
+
+  def invoke(self, type, args, kwargs):
+    content = kwargs.pop("content", None)
+    frame = Method(type, type.arguments(*args, **kwargs))
+    if self.reliable:
+      self.request(frame, self.queue_response, content)
+      try:
+        resp = self.responses.get()
+        return Message(self, resp)
+      except QueueClosed, e:
+        if self.closed:
+          raise Closed(self.reason)
+        else:
+          raise e
+    else:
+      return self.invoke_method(frame, content)
+
+  def invoke_method(self, frame, content = None):
+    self.write(frame, content)
 
     try:
       # here we depend on all nowait fields being named nowait
-      f = method.fields.byname["nowait"]
-      nowait = args[method.fields.index(f)]
+      f = frame.method.fields.byname["nowait"]
+      nowait = frame.args[frame.method.fields.index(f)]
     except KeyError:
       nowait = False
 
     try:
-      if not nowait and method.responses:
-        resp = self.responses.get().payload
+      if not nowait and frame.method.responses:
+        resp = self.responses.get()
         if resp.method.content:
           content = read_content(self.responses)
         else:
           content = None
-        if resp.method in method.responses:
-          return Message(resp.method, resp.args, content)
+        if resp.method in frame.method.responses:
+          return Message(self, resp, content)
         else:
           raise ValueError(resp)
     except QueueClosed, e:
@@ -183,19 +275,15 @@
       else:
         raise e
 
-  def write_content(self, klass, content, queue):
-    size = content.size()
-    header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
-    queue.put(header)
-    for child in content.children:
-      self.write_content(klass, child, queue)
-    # should split up if content.body exceeds max frame size
-    if size > 0:
-      queue.put(Frame(self.id, Body(content.body)))
+  def __getattr__(self, name):
+    type = self.spec.method(name)
+    if type == None: raise AttributeError(name)
+    method = lambda *args, **kwargs: self.invoke(type, args, kwargs)
+    self.__dict__[name] = method
+    return method
 
 def read_content(queue):
-  frame = queue.get()
-  header = frame.payload
+  header = queue.get()
   children = []
   for i in range(header.weight):
     children.append(read_content(queue))
@@ -204,7 +292,7 @@
   buf = StringIO()
   while read < size:
     body = queue.get()
-    content = body.payload.content
+    content = body.content
     buf.write(content)
     read += len(content)
   return Content(buf.getvalue(), children, header.properties.copy())

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Fri Mar 16 13:26:11 2007
@@ -38,21 +38,16 @@
     self.byname = {}
     self.byid = {}
     self.indexes = {}
-    self.bypyname = {}
 
   def add(self, item):
     if self.byname.has_key(item.name):
       raise ValueError("duplicate name: %s" % item)
     if self.byid.has_key(item.id):
       raise ValueError("duplicate id: %s" % item)
-    pyname = pythonize(item.name)
-    if self.bypyname.has_key(pyname):
-      raise ValueError("duplicate pyname: %s" % item)
     self.indexes[item] = len(self.items)
     self.items.append(item)
     self.byname[item.name] = item
     self.byid[item.id] = item
-    self.bypyname[pyname] = item
 
   def index(self, item):
     try:
@@ -91,11 +86,23 @@
     self.file = file
     self.constants = SpecContainer()
     self.classes = SpecContainer()
+    # methods indexed by classname_methname
+    self.methods = {}
 
   def post_load(self):
     self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
     self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
 
+  def method(self, name):
+    if not self.methods.has_key(name):
+      for cls in self.classes:
+        clen = len(cls.name)
+        if name.startswith(cls.name) and name[clen] == "_":
+          end = name[clen + 1:]
+          if cls.methods.byname.has_key(end):
+            self.methods[name] = cls.methods.byname[end]
+    return self.methods.get(name)
+
   def parse_method(self, name):
     parts = re.split(r"\s*\.\s*", name)
     if len(parts) != 2:
@@ -107,17 +114,16 @@
     module = new.module(name, doc)
     module.__file__ = self.file
     for c in self.classes:
-      classname = pythonize(c.name)
-      cls = c.define_class(classname)
+      cls = c.define_class(c.name)
       cls.__module__ = module.__name__
-      setattr(module, classname, cls)
+      setattr(module, c.name, cls)
     return module
 
   def define_class(self, name):
     methods = {}
     for c in self.classes:
       for m in c.methods:
-        meth = pythonize(m.klass.name + "_" + m.name)
+        meth = m.klass.name + "_" + m.name
         methods[meth] = m.define_method(meth)
     return type(name, (), methods)
 
@@ -150,8 +156,7 @@
   def define_class(self, name):
     methods = {}
     for m in self.methods:
-      meth = pythonize(m.name)
-      methods[meth] = m.define_method(meth)
+      methods[m.name] = m.define_method(m.name)
     return type(name, (), methods)
 
 class Method(Metadata):
@@ -172,11 +177,35 @@
     self.docs = docs
     self.response = False
 
+  def arguments(self, *args, **kwargs):
+    nargs = len(args) + len(kwargs)
+    maxargs = len(self.fields)
+    if nargs > maxargs:
+      self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
+    result = []
+    for f in self.fields:
+      idx = self.fields.index(f)
+      if idx < len(args):
+        result.append(args[idx])
+      elif kwargs.has_key(f.name):
+        result.append(kwargs.pop(f.name))
+      else:
+        result.append(Method.DEFAULTS[f.type])
+    for key, value in kwargs.items():
+      if self.fields.byname.has_key(key):
+        self._type_error("got multiple values for keyword argument '%s'", key)
+      else:
+        self._type_error("got an unexpected keyword argument '%s'", key)
+    return tuple(result)
+
+  def _type_error(self, msg, *args):
+    raise TypeError("%s %s" % (self.name, msg % args))
+
   def docstring(self):
     s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
     for f in self.fields:
       if f.docs:
-        s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] +
+        s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
                                   [fill(d, 4) for d in f.docs[1:]])
     return s
 
@@ -195,16 +224,13 @@
   def define_method(self, name):
     g = {Method.METHOD: self}
     l = {}
-    args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields]
+    args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
+    methargs = args[:]
     if self.content:
       args += [("content", None)]
     code = "def %s(self, %s):\n" % \
            (name, ", ".join(["%s = %r" % a for a in args]))
     code += "  %r\n" % self.docstring()
-    if self.content:
-      methargs = args[:-1]
-    else:
-      methargs = args
     argnames = ", ".join([a[0] for a in methargs])
     code += "  return self.invoke(%s" % Method.METHOD
     if argnames:
@@ -239,7 +265,7 @@
       type = f_nd["@type"]
     while domains.has_key(type) and domains[type] != type:
       type = domains[type]
-    l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd)))
+    l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, get_docs(f_nd)))
 
 def load(specfile):
   doc = xmlutil.parse(specfile)
@@ -248,8 +274,8 @@
 
   # constants
   for nd in root["constant"]:
-    const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"),
-                     get_docs(nd))
+    const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]),
+                     nd.get("@class"), get_docs(nd))
     spec.constants.add(const)
 
   # domains are typedefs
@@ -259,14 +285,14 @@
 
   # classes
   for c_nd in root["class"]:
-    klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"],
-                  get_docs(c_nd))
+    klass = Class(spec, pythonize(c_nd["@name"]), int(c_nd["@index"]),
+                  c_nd["@handler"], get_docs(c_nd))
     load_fields(c_nd, klass.fields, domains)
     for m_nd in c_nd["method"]:
-      meth = Method(klass, m_nd["@name"],
+      meth = Method(klass, pythonize(m_nd["@name"]),
                     int(m_nd["@index"]),
                     m_nd.get_bool("@content", False),
-                    [nd["@name"] for nd in m_nd["response"]],
+                    [pythonize(nd["@name"]) for nd in m_nd["response"]],
                     m_nd.get_bool("@synchronous", False),
                     m_nd.text,
                     get_docs(m_nd))

Modified: incubator/qpid/trunk/qpid/python/tests/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/example.py?view=diff&rev=519129&r1=519128&r2=519129
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/example.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/example.py Fri Mar 16 13:26:11 2007
@@ -58,7 +58,7 @@
 
         # Here we use ordinal arguments.
         self.exchange_declare(channel, 0, "test", "direct")
-        
+
         # Here we use keyword arguments.
         self.queue_declare(channel, queue="test-queue")
         channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
@@ -85,7 +85,7 @@
         # argument in case the server hangs. By default queue.get() will wait
         # until a message arrives or the connection to the server dies.
         msg = queue.get(timeout=10)
-        
+
         # And check that we got the right response with assertEqual
         self.assertEqual(body, msg.content.body)