You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/08/10 17:55:17 UTC
svn commit: r564637 - in /incubator/qpid/trunk/qpid/python: hello-world
qpid/__init__.py qpid/client.py qpid/codec.py qpid/connection.py
qpid/content.py qpid/message.py qpid/peer.py qpid/spec.py server
tests/codec.py
Author: rhs
Date: Fri Aug 10 08:55:16 2007
New Revision: 564637
URL: http://svn.apache.org/viewvc?view=rev&rev=564637
Log:
added support for unpacked structs and execution.result
Modified:
incubator/qpid/trunk/qpid/python/hello-world
incubator/qpid/trunk/qpid/python/qpid/__init__.py
incubator/qpid/trunk/qpid/python/qpid/client.py
incubator/qpid/trunk/qpid/python/qpid/codec.py
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/content.py
incubator/qpid/trunk/qpid/python/qpid/message.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/spec.py
incubator/qpid/trunk/qpid/python/server
incubator/qpid/trunk/qpid/python/tests/codec.py
Modified: incubator/qpid/trunk/qpid/python/hello-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-world?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-world Fri Aug 10 08:55:16 2007
@@ -3,13 +3,13 @@
from qpid.client import Client
from qpid.content import Content
-client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-9.xml",
- "../specs/amqp-errata.0-9.xml"))
+client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml"))
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
ch = client.channel(1)
-ch.channel_open()
+ch.session_open()
ch.queue_declare(queue="test")
ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test")
-ch.message_consume(queue="test", destination="test")
-ch.message_transfer(destination="amq.direct", routing_key="test",
- body="hello world")
+print ch.queue_query(queue="test")
+ch.message_subscribe(queue="test", destination="test")
+ch.message_transfer(destination="amq.direct",
+ content=Content("hello world"))
Modified: incubator/qpid/trunk/qpid/python/qpid/__init__.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/__init__.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/__init__.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/__init__.py Fri Aug 10 08:55:16 2007
@@ -18,3 +18,26 @@
#
import spec, codec, connection, content, peer, delegate, client
+
+class Struct:
+
+ def __init__(self, type):
+ self.__dict__["type"] = type
+ self.__dict__["_values"] = {}
+
+ def _check(self, attr):
+ field = self.type.fields.byname.get(attr)
+ if field == None:
+ raise AttributeError(attr)
+ return field
+
+ def __setattr__(self, attr, value):
+ self._check(attr)
+ self._values[attr] = value
+
+ def __getattr__(self, attr):
+ field = self._check(attr)
+ return self._values.get(attr, field.default())
+
+ def __str__(self):
+ return "%s %s" % (self.type.type, self._values)
Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Fri Aug 10 08:55:16 2007
@@ -110,7 +110,7 @@
#todo: just override the params, i.e. don't require them
# all to be included in tune_params
msg.tune_ok(**self.client.tune_params)
- else:
+ else:
msg.tune_ok(*msg.frame.args)
self.client.started.set()
@@ -142,6 +142,10 @@
def execution_complete(self, ch, msg):
ch.completion.complete(msg.cumulative_execution_mark)
+
+ def execution_result(self, ch, msg):
+ future = ch.futures[msg.command_id]
+ future.put_response(ch, msg.data)
def close(self, reason):
self.client.closed = True
Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Fri Aug 10 08:55:16 2007
@@ -26,7 +26,7 @@
The unit test for this module is located in tests/codec.py
"""
-import re
+import re, qpid
from cStringIO import StringIO
from struct import *
from reference import ReferenceId
@@ -40,11 +40,12 @@
class that handles encoding/decoding of AMQP primitives
"""
- def __init__(self, stream):
+ def __init__(self, stream, spec):
"""
initializing the stream/fields used
"""
self.stream = stream
+ self.spec = spec
self.nwrote = 0
self.nread = 0
self.incoming_bits = []
@@ -163,7 +164,7 @@
# short int's valid range is [0,65535]
if (o < 0 or o > 65535):
- raise ValueError('Valid range of short int is [0,65535]')
+ raise ValueError('Valid range of short int is [0,65535]: %s' % o)
self.pack("!H", o)
@@ -255,7 +256,7 @@
encodes a table data structure in network byte order
"""
enc = StringIO()
- codec = Codec(enc)
+ codec = Codec(enc, self.spec)
if tbl:
for key, value in tbl.items():
if len(key) > 128:
@@ -356,3 +357,21 @@
def decode_uuid(self):
return self.decode_longstr()
+
+ def encode_long_struct(self, s):
+ enc = StringIO()
+ codec = Codec(enc, self.spec)
+ type = s.type
+ codec.encode_short(type.type)
+ for f in type.fields:
+ codec.encode(f.type, getattr(s, f.name))
+ codec.flush()
+ self.encode_longstr(enc.getvalue())
+
+ def decode_long_struct(self):
+ codec = Codec(StringIO(self.decode_longstr()), self.spec)
+ type = self.spec.structs[codec.decode_short()]
+ s = qpid.Struct(type)
+ for f in type.fields:
+ setattr(s, f.name, codec.decode(f.type))
+ return s
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Fri Aug 10 08:55:16 2007
@@ -74,7 +74,7 @@
class Connection:
def __init__(self, io, spec):
- self.codec = codec.Codec(io)
+ self.codec = codec.Codec(io, spec)
self.spec = spec
self.FRAME_END = self.spec.constants.byname["frame_end"].id
@@ -95,7 +95,7 @@
c.encode_octet(self.spec.constants.byname[frame.type].id)
c.encode_short(frame.channel)
body = StringIO()
- enc = codec.Codec(body)
+ enc = codec.Codec(body, self.spec)
frame.encode(enc)
enc.flush()
c.encode_longstr(body.getvalue())
@@ -106,7 +106,7 @@
type = self.spec.constants.byid[c.decode_octet()].name
channel = c.decode_short()
body = c.decode_longstr()
- dec = codec.Codec(StringIO(body))
+ dec = codec.Codec(StringIO(body), self.spec)
frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
frame.channel = channel
end = c.decode_octet()
Modified: incubator/qpid/trunk/qpid/python/qpid/content.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/content.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/content.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/content.py Fri Aug 10 08:55:16 2007
@@ -48,3 +48,11 @@
def __delitem__(self, name):
del self.properties[name]
+
+ def __str__(self):
+ if self.children:
+ return "%s [%s] %s" % (self.properties,
+ ", ".join(map(str, self.children)),
+ self.body)
+ else:
+ return "%s %s" % (self.properties, self.body)
Modified: incubator/qpid/trunk/qpid/python/qpid/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/message.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/message.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/message.py Fri Aug 10 08:55:16 2007
@@ -29,7 +29,7 @@
if self.method.is_l4_command():
self.command_id = self.channel.incoming_completion.sequence.next()
#print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
-
+
def __len__(self):
return len(self.frame.args)
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Fri Aug 10 08:55:16 2007
@@ -189,6 +189,7 @@
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
@@ -261,6 +262,7 @@
self.completion.reset()
self.incoming_completion.reset()
self.completion.next_command(type)
+
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
return self.invoker(frame, content)
@@ -275,7 +277,7 @@
self.request(frame, self.queue_response, content)
if not frame.method.responses:
- if self.use_execution_layer and type.is_l4_command():
+ if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_flush()
self.completion.wait()
if self.closed:
@@ -287,7 +289,6 @@
return Message(self, resp, read_content(self.responses))
else:
return Message(self, resp)
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -296,6 +297,11 @@
# used for 0-8 and 0-10
def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
self.write(frame, content)
try:
@@ -316,6 +322,11 @@
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ return future.get_response(timeout=10)
+ else:
+ return future
elif self.synchronous and not frame.method.response \
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_flush()
@@ -324,7 +335,6 @@
raise Closed(self.reason)
if not completed:
self.close("Timed-out waiting for completion")
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Fri Aug 10 08:55:16 2007
@@ -91,6 +91,8 @@
self.classes = SpecContainer()
# methods indexed by classname_methname
self.methods = {}
+ # structs by type code
+ self.structs = {}
def post_load(self):
self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
@@ -295,13 +297,18 @@
self.description = description
self.docs = docs
+ def default(self):
+ return Method.DEFAULTS[self.type]
+
def get_result(nd, spec):
result = nd["result"]
if not result: return None
name = result["@domain"]
if name != None: return spec.domains.byname[name]
st_nd = result["struct"]
- st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 +
+ int(st_nd["@type"]), st_nd["@pack"])
+ spec.structs[st.type] = st
load_fields(st_nd, st.fields, spec.domains.byname)
return st
@@ -352,7 +359,12 @@
type = nd["@type"]
if type == None:
st_nd = nd["struct"]
- type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"])
+ code = st_nd["@type"]
+ if code not in (None, "", "none"):
+ code = int(code)
+ type = Struct(st_nd["@size"], code, st_nd["@pack"])
+ if type.type != None:
+ spec.structs[type.type] = type
structs.append((type, st_nd))
else:
type = pythonize(type)
Modified: incubator/qpid/trunk/qpid/python/server
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/server?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/server (original)
+++ incubator/qpid/trunk/qpid/python/server Fri Aug 10 08:55:16 2007
@@ -3,22 +3,54 @@
from qpid.connection import Connection, listen
from qpid.delegate import Delegate
from qpid.peer import Peer
+from qpid import Struct
class Server(Delegate):
+ def __init__(self):
+ Delegate.__init__(self)
+ self.queues = {}
+ self.bindings = {}
+
def connection_open(self, ch, msg):
msg.open_ok()
- def channel_open(self, ch, msg):
- print "channel %s open" % ch.id
- msg.open_ok()
+ def session_open(self, ch, msg):
+ print "session open on channel %s" % ch.id
+ msg.attached()
+
+ def execution_flush(self, ch, msg):
+ pass
+
+ def queue_declare(self, ch, msg):
+ self.queues[msg.queue] = []
+ print "queue declared: %s" % msg.queue
+ msg.complete()
+
+ def queue_bind(self, ch, msg):
+ if self.bindings.has_key(msg.exchange):
+ queues = self.bindings[msg.exchange]
+ else:
+ queues = set()
+ self.bindings[msg.exchange] = queues
+ queues.add((msg.routing_key, msg.queue))
+ msg.complete()
+
+ def queue_query(self, ch, msg):
+ st = Struct(msg.method.result)
+ ch.execution_result(msg.command_id, st)
+ msg.complete()
+
+ def message_subscribe(self, ch, msg):
+ print msg
+ msg.complete()
def message_transfer(self, ch, msg):
- print msg.body
- msg.ok()
+ print msg.content
+ msg.complete()
-spec = qpid.spec.load("../specs/amqp.0-9.xml")
+spec = qpid.spec.load("../specs/amqp.0-10-preview.xml")
for io in listen("0.0.0.0", 5672):
c = Connection(io, spec)
Modified: incubator/qpid/trunk/qpid/python/tests/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/codec.py?view=diff&rev=564637&r1=564636&r2=564637
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/codec.py Fri Aug 10 08:55:16 2007
@@ -20,6 +20,7 @@
import unittest
from qpid.codec import Codec
+from qpid.spec import load
from cStringIO import StringIO
from qpid.reference import ReferenceId
@@ -52,11 +53,13 @@
"""
+SPEC = load("../specs/amqp.0-10-preview.xml")
# --------------------------------------
# --------------------------------------
class BaseDataTypes(unittest.TestCase):
+
"""
Base class containing common functions
"""
@@ -66,7 +69,7 @@
"""
standard setUp for unitetest (refer unittest documentation for details)
"""
- self.codec = Codec(StringIO())
+ self.codec = Codec(StringIO(), SPEC)
# ------------------
def tearDown(self):
@@ -504,7 +507,7 @@
else:
values = [value]
stream = StringIO()
- codec = Codec(stream)
+ codec = Codec(stream, SPEC)
for v in values:
codec.encode(type, v)
codec.flush()