You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/08/10 17:55:17 UTC

svn commit: r564637 - in /incubator/qpid/trunk/qpid/python: hello-world qpid/__init__.py qpid/client.py qpid/codec.py qpid/connection.py qpid/content.py qpid/message.py qpid/peer.py qpid/spec.py server tests/codec.py

Author: rhs
Date: Fri Aug 10 08:55:16 2007
New Revision: 564637

URL: http://svn.apache.org/viewvc?view=rev&rev=564637
Log:
added support for unpacked structs and execution.result

Modified:
    incubator/qpid/trunk/qpid/python/hello-world
    incubator/qpid/trunk/qpid/python/qpid/__init__.py
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/codec.py
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/content.py
    incubator/qpid/trunk/qpid/python/qpid/message.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/spec.py
    incubator/qpid/trunk/qpid/python/server
    incubator/qpid/trunk/qpid/python/tests/codec.py

Modified: incubator/qpid/trunk/qpid/python/hello-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-world?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-world Fri Aug 10 08:55:16 2007
@@ -3,13 +3,13 @@
 from qpid.client import Client
 from qpid.content import Content
 
-client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-9.xml",
-                                                  "../specs/amqp-errata.0-9.xml"))
+client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml"))
 client.start({"LOGIN": "guest", "PASSWORD": "guest"})
 ch = client.channel(1)
-ch.channel_open()
+ch.session_open()
 ch.queue_declare(queue="test")
 ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-ch.message_consume(queue="test", destination="test")
-ch.message_transfer(destination="amq.direct", routing_key="test",
-                    body="hello world")
+print ch.queue_query(queue="test")
+ch.message_subscribe(queue="test", destination="test")
+ch.message_transfer(destination="amq.direct",
+                    content=Content("hello world"))

Modified: incubator/qpid/trunk/qpid/python/qpid/__init__.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/__init__.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/__init__.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/__init__.py Fri Aug 10 08:55:16 2007
@@ -18,3 +18,26 @@
 #
 
 import spec, codec, connection, content, peer, delegate, client
+
+class Struct:
+
+  def __init__(self, type):
+    self.__dict__["type"] = type
+    self.__dict__["_values"] = {}
+
+  def _check(self, attr):
+    field = self.type.fields.byname.get(attr)
+    if field == None:
+      raise AttributeError(attr)
+    return field
+
+  def __setattr__(self, attr, value):
+    self._check(attr)
+    self._values[attr] = value
+
+  def __getattr__(self, attr):
+    field = self._check(attr)
+    return self._values.get(attr, field.default())
+
+  def __str__(self):
+    return "%s %s" % (self.type.type, self._values)

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Fri Aug 10 08:55:16 2007
@@ -110,7 +110,7 @@
       #todo: just override the params, i.e. don't require them
       #      all to be included in tune_params
       msg.tune_ok(**self.client.tune_params)
-    else:  
+    else:
       msg.tune_ok(*msg.frame.args)
     self.client.started.set()
 
@@ -142,6 +142,10 @@
 
   def execution_complete(self, ch, msg):
     ch.completion.complete(msg.cumulative_execution_mark)
+
+  def execution_result(self, ch, msg):
+    future = ch.futures[msg.command_id]
+    future.put_response(ch, msg.data)
 
   def close(self, reason):
     self.client.closed = True

Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Fri Aug 10 08:55:16 2007
@@ -26,7 +26,7 @@
 The unit test for this module is located in tests/codec.py
 """
 
-import re
+import re, qpid
 from cStringIO import StringIO
 from struct import *
 from reference import ReferenceId
@@ -40,11 +40,12 @@
   class that handles encoding/decoding of AMQP primitives
   """
 
-  def __init__(self, stream):
+  def __init__(self, stream, spec):
     """
     initializing the stream/fields used
     """
     self.stream = stream
+    self.spec = spec
     self.nwrote = 0
     self.nread = 0
     self.incoming_bits = []
@@ -163,7 +164,7 @@
 
     # short int's valid range is [0,65535]
     if (o < 0 or o > 65535):
-        raise ValueError('Valid range of short int is [0,65535]')
+        raise ValueError('Valid range of short int is [0,65535]: %s' % o)
 
     self.pack("!H", o)
 
@@ -255,7 +256,7 @@
     encodes a table data structure in network byte order
     """
     enc = StringIO()
-    codec = Codec(enc)
+    codec = Codec(enc, self.spec)
     if tbl:
       for key, value in tbl.items():
         if len(key) > 128:
@@ -356,3 +357,21 @@
 
   def decode_uuid(self):
     return self.decode_longstr()
+
+  def encode_long_struct(self, s):
+    enc = StringIO()
+    codec = Codec(enc, self.spec)
+    type = s.type
+    codec.encode_short(type.type)
+    for f in type.fields:
+      codec.encode(f.type, getattr(s, f.name))
+    codec.flush()
+    self.encode_longstr(enc.getvalue())
+
+  def decode_long_struct(self):
+    codec = Codec(StringIO(self.decode_longstr()), self.spec)
+    type = self.spec.structs[codec.decode_short()]
+    s = qpid.Struct(type)
+    for f in type.fields:
+      setattr(s, f.name, codec.decode(f.type))
+    return s

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Fri Aug 10 08:55:16 2007
@@ -74,7 +74,7 @@
 class Connection:
 
   def __init__(self, io, spec):
-    self.codec = codec.Codec(io)
+    self.codec = codec.Codec(io, spec)
     self.spec = spec
     self.FRAME_END = self.spec.constants.byname["frame_end"].id
 
@@ -95,7 +95,7 @@
     c.encode_octet(self.spec.constants.byname[frame.type].id)
     c.encode_short(frame.channel)
     body = StringIO()
-    enc = codec.Codec(body)
+    enc = codec.Codec(body, self.spec)
     frame.encode(enc)
     enc.flush()
     c.encode_longstr(body.getvalue())
@@ -106,7 +106,7 @@
     type = self.spec.constants.byid[c.decode_octet()].name
     channel = c.decode_short()
     body = c.decode_longstr()
-    dec = codec.Codec(StringIO(body))
+    dec = codec.Codec(StringIO(body), self.spec)
     frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
     frame.channel = channel
     end = c.decode_octet()

Modified: incubator/qpid/trunk/qpid/python/qpid/content.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/content.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/content.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/content.py Fri Aug 10 08:55:16 2007
@@ -48,3 +48,11 @@
 
   def __delitem__(self, name):
     del self.properties[name]
+
+  def __str__(self):
+    if self.children:
+      return "%s [%s] %s" % (self.properties,
+                             ", ".join(map(str, self.children)),
+                             self.body)
+    else:
+      return "%s %s" % (self.properties, self.body)

Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Fri Aug 10 08:55:16 2007
@@ -29,7 +29,7 @@
     if self.method.is_l4_command():
       self.command_id = self.channel.incoming_completion.sequence.next()
       #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
-      
+
   def __len__(self):
     return len(self.frame.args)
 

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Fri Aug 10 08:55:16 2007
@@ -189,6 +189,7 @@
 
     self.completion = OutgoingCompletion()
     self.incoming_completion = IncomingCompletion(self)
+    self.futures = {}
 
     # Use reliable framing if version == 0-9.
     if spec.major == 0 and spec.minor == 9:
@@ -261,6 +262,7 @@
       self.completion.reset()
       self.incoming_completion.reset()
     self.completion.next_command(type)
+
     content = kwargs.pop("content", None)
     frame = Method(type, type.arguments(*args, **kwargs))
     return self.invoker(frame, content)
@@ -275,7 +277,7 @@
 
     self.request(frame, self.queue_response, content)
     if not frame.method.responses:
-      if self.use_execution_layer and type.is_l4_command():
+      if self.use_execution_layer and frame.method_type.is_l4_command():
         self.execution_flush()
         self.completion.wait()
         if self.closed:
@@ -287,7 +289,6 @@
         return Message(self, resp, read_content(self.responses))
       else:
         return Message(self, resp)
-
     except QueueClosed, e:
       if self.closed:
         raise Closed(self.reason)
@@ -296,6 +297,11 @@
 
   # used for 0-8 and 0-10
   def invoke_method(self, frame, content = None):
+    if frame.method.result:
+      cmd_id = self.completion.command_id
+      future = Future()
+      self.futures[cmd_id] = future
+
     self.write(frame, content)
 
     try:
@@ -316,6 +322,11 @@
           return Message(self, resp, content)
         else:
           raise ValueError(resp)
+      elif frame.method.result:
+        if self.synchronous:
+          return future.get_response(timeout=10)
+        else:
+          return future
       elif self.synchronous and not frame.method.response \
                and self.use_execution_layer and frame.method.is_l4_command():
         self.execution_flush()
@@ -324,7 +335,6 @@
           raise Closed(self.reason)
         if not completed:
           self.close("Timed-out waiting for completion")
-
     except QueueClosed, e:
       if self.closed:
         raise Closed(self.reason)

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Fri Aug 10 08:55:16 2007
@@ -91,6 +91,8 @@
     self.classes = SpecContainer()
     # methods indexed by classname_methname
     self.methods = {}
+    # structs by type code
+    self.structs = {}
 
   def post_load(self):
     self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
@@ -295,13 +297,18 @@
     self.description = description
     self.docs = docs
 
+  def default(self):
+    return Method.DEFAULTS[self.type]
+
 def get_result(nd, spec):
   result = nd["result"]
   if not result: return None
   name = result["@domain"]
   if name != None: return spec.domains.byname[name]
   st_nd = result["struct"]
-  st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+  st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
+              int(st_nd["@type"]), st_nd["@pack"])
+  spec.structs[st.type] = st
   load_fields(st_nd, st.fields, spec.domains.byname)
   return st
 
@@ -352,7 +359,12 @@
       type = nd["@type"]
       if type == None:
         st_nd = nd["struct"]
-        type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+        code = st_nd["@type"]
+        if code not in (None, "", "none"):
+          code = int(code)
+        type = Struct(st_nd["@size"], code, st_nd["@pack"])
+        if type.type != None:
+          spec.structs[type.type] = type
         structs.append((type, st_nd))
       else:
         type = pythonize(type)

Modified: incubator/qpid/trunk/qpid/python/server
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/server?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/server (original)
+++ incubator/qpid/trunk/qpid/python/server Fri Aug 10 08:55:16 2007
@@ -3,22 +3,54 @@
 from qpid.connection import Connection, listen
 from qpid.delegate import Delegate
 from qpid.peer import Peer
+from qpid import Struct
 
 class Server(Delegate):
 
+  def __init__(self):
+    Delegate.__init__(self)
+    self.queues = {}
+    self.bindings = {}
+
   def connection_open(self, ch, msg):
     msg.open_ok()
 
-  def channel_open(self, ch, msg):
-    print "channel %s open" % ch.id
-    msg.open_ok()
+  def session_open(self, ch, msg):
+    print "session open on channel %s" % ch.id
+    msg.attached()
+
+  def execution_flush(self, ch, msg):
+    pass
+
+  def queue_declare(self, ch, msg):
+    self.queues[msg.queue] = []
+    print "queue declared: %s" % msg.queue
+    msg.complete()
+
+  def queue_bind(self, ch, msg):
+    if self.bindings.has_key(msg.exchange):
+      queues = self.bindings[msg.exchange]
+    else:
+      queues = set()
+      self.bindings[msg.exchange] = queues
+    queues.add((msg.routing_key, msg.queue))
+    msg.complete()
+
+  def queue_query(self, ch, msg):
+    st = Struct(msg.method.result)
+    ch.execution_result(msg.command_id, st)
+    msg.complete()
+
+  def message_subscribe(self, ch, msg):
+    print msg
+    msg.complete()
 
   def message_transfer(self, ch, msg):
-    print msg.body
-    msg.ok()
+    print msg.content
+    msg.complete()
 
 
-spec = qpid.spec.load("../specs/amqp.0-9.xml")
+spec = qpid.spec.load("../specs/amqp.0-10-preview.xml")
 
 for io in listen("0.0.0.0", 5672):
   c = Connection(io, spec)

Modified: incubator/qpid/trunk/qpid/python/tests/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/codec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/codec.py Fri Aug 10 08:55:16 2007
@@ -20,6 +20,7 @@
 
 import unittest
 from qpid.codec import Codec
+from qpid.spec import load
 from cStringIO import StringIO
 from qpid.reference import ReferenceId
 
@@ -52,11 +53,13 @@
 
 """
 
+SPEC = load("../specs/amqp.0-10-preview.xml")
 
 # --------------------------------------
 # --------------------------------------
 class BaseDataTypes(unittest.TestCase):
 
+
     """
     Base class containing common functions
     """
@@ -66,7 +69,7 @@
         """
         standard setUp for unitetest (refer unittest documentation for details)
         """
-        self.codec = Codec(StringIO())
+        self.codec = Codec(StringIO(), SPEC)
 
     # ------------------
     def tearDown(self):
@@ -504,7 +507,7 @@
     else:
       values = [value]
     stream = StringIO()
-    codec = Codec(stream)
+    codec = Codec(stream, SPEC)
     for v in values:
       codec.encode(type, v)
     codec.flush()