You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/03/05 15:39:41 UTC

svn commit: r633861 - in /incubator/qpid/trunk/qpid/python: hello-010-world qpid/assembler.py qpid/datatypes.py qpid/session.py qpid/spec010.py server010 tests/connection010.py

Author: rhs
Date: Wed Mar  5 06:39:40 2008
New Revision: 633861

URL: http://svn.apache.org/viewvc?rev=633861&view=rev
Log:
added incoming queues for messages; altered session dispatch to send entire assembly to a single handler; added logging switch for hello-010-world

Modified:
    incubator/qpid/trunk/qpid/python/hello-010-world
    incubator/qpid/trunk/qpid/python/qpid/assembler.py
    incubator/qpid/trunk/qpid/python/qpid/datatypes.py
    incubator/qpid/trunk/qpid/python/qpid/session.py
    incubator/qpid/trunk/qpid/python/qpid/spec010.py
    incubator/qpid/trunk/qpid/python/server010
    incubator/qpid/trunk/qpid/python/tests/connection010.py

Modified: incubator/qpid/trunk/qpid/python/hello-010-world
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-010-world?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-010-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-010-world Wed Mar  5 06:39:40 2008
@@ -1,13 +1,18 @@
 #!/usr/bin/env python
 
-import logging
+import sys, logging
 from qpid.connection010 import Connection
 from qpid.spec010 import load
 from qpid.util import connect
 from qpid.datatypes import Message
 
+if "-v" in sys.argv:
+  level = logging.DEBUG
+else:
+  level = logging.WARN
+
 format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
-logging.basicConfig(level=logging.DEBUG, format=format, datefmt='%H:%M:%S')
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
 
 spec = load("../specs/amqp.0-10.xml")
 conn = Connection(connect("0.0.0.0", spec.port), spec)
@@ -18,9 +23,14 @@
 ssn.queue_declare("asdf")
 
 ssn.message_transfer("this", None, None, Message("testing..."))
-ssn.message_transfer("is")
+ssn.message_transfer("is", None, None, Message("more testing..."))
 ssn.message_transfer("a")
 ssn.message_transfer("test")
+
+print ssn.incoming("this").get()
+print ssn.incoming("is").get()
+print ssn.incoming("a").get()
+print ssn.incoming("test").get()
 
 print ssn.queue_query("testing")
 

Modified: incubator/qpid/trunk/qpid/python/qpid/assembler.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/assembler.py?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/assembler.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/assembler.py Wed Mar  5 06:39:40 2008
@@ -46,7 +46,9 @@
 
   def decode_command(self, spec):
     sc = StringCodec(spec, self.payload)
-    return sc.read_command()
+    cmd = sc.read_command()
+    cmd.id = self.id
+    return cmd
 
   def decode_header(self, spec):
     sc = StringCodec(spec, self.payload)
@@ -56,7 +58,7 @@
     return values
 
   def decode_body(self, spec):
-    return self
+    return self.payload
 
   def __str__(self):
     return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,

Modified: incubator/qpid/trunk/qpid/python/qpid/datatypes.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/datatypes.py?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/datatypes.py Wed Mar  5 06:39:40 2008
@@ -43,6 +43,14 @@
     else:
       self.headers = None
 
+  def __repr__(self):
+    args = []
+    if self.headers:
+      args.extend(self.headers)
+    if self.body:
+      args.append(self.body)
+    return "Message(%s)" % ", ".join(map(repr, args))
+
 class Range:
 
   def __init__(self, lower, upper):

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Wed Mar  5 06:39:40 2008
@@ -17,11 +17,14 @@
 # under the License.
 #
 
-from threading import Event
+from threading import Event, RLock
 from invoker import Invoker
 from datatypes import RangeSet, Struct, Future
 from codec010 import StringCodec
 from assembler import Segment
+from queue import Queue
+from datatypes import Message
+from logging import getLogger
 
 class SessionDetached(Exception): pass
 
@@ -46,6 +49,20 @@
     self.delegate = delegate(self)
     self.send_id = True
     self.results = {}
+    self.lock = RLock()
+    self._incoming = {}
+    self.assembly = None
+
+  def incoming(self, destination):
+    self.lock.acquire()
+    try:
+      queue = self._incoming.get(destination)
+      if queue == None:
+        queue = Queue()
+        self._incoming[destination] = queue
+      return queue
+    finally:
+      self.lock.release()
 
   def close(self, timeout=None):
     self.channel.session_detach(self.name)
@@ -106,19 +123,37 @@
 
   def received(self, seg):
     self.receiver.received(seg)
-    if seg.type == self.spec["segment_type.command"].value:
-      cmd = seg.decode(self.spec)
-      attr = cmd.type.qname.replace(".", "_")
-      result = getattr(self.delegate, attr)(cmd)
-      if cmd.type.result:
-        self.execution_result(seg.id, result)
-    elif seg.type == self.spec["segment_type.header"].value:
-      self.delegate.header(seg.decode(self.spec))
-    elif seg.type == self.spec["segment_type.body"].value:
-      self.delegate.body(seg.decode(self.spec))
-    else:
-      raise ValueError("unknown segment type: %s" % seg.type)
-    self.receiver.completed(seg)
+    if seg.first:
+      assert self.assembly == None
+      self.assembly = []
+    self.assembly.append(seg)
+    if seg.last:
+      self.dispatch(self.assembly)
+      self.assembly = None
+
+  def dispatch(self, assembly):
+    cmd = assembly.pop(0).decode(self.spec)
+    args = []
+
+    for st in cmd.type.segments:
+      if assembly:
+        seg = assembly[0]
+        if seg.type == st.segment_type:
+          args.append(seg.decode(self.spec))
+          assembly.pop(0)
+          continue
+      args.append(None)
+
+    assert len(assembly) == 0
+
+    attr = cmd.type.qname.replace(".", "_")
+    result = getattr(self.delegate, attr)(cmd, *args)
+
+    if cmd.type.result:
+      self.execution_result(cmd.id, result)
+
+    for seg in assembly:
+      self.receiver.completed(seg)
 
   def send(self, seg):
     self.sender.send(seg)
@@ -196,13 +231,13 @@
     future = self.session.results[er.command_id]
     future.set(er.value)
 
-class Client(Delegate):
-
-  def message_transfer(self, cmd):
-    print "TRANSFER:", cmd
+msg = getLogger("qpid.ssn.msg")
 
-  def header(self, hdr):
-    print "HEADER:", hdr
+class Client(Delegate):
 
-  def body(self, seg):
-    print "BODY:", seg
+  def message_transfer(self, cmd, headers, body):
+    m = Message(body)
+    m.headers = headers
+    messages = self.session.incoming(cmd.destination)
+    messages.put(m)
+    msg.debug("RECV: %s", m)

Modified: incubator/qpid/trunk/qpid/python/qpid/spec010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec010.py?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec010.py Wed Mar  5 06:39:40 2008
@@ -194,7 +194,7 @@
       result[f.name] = a
 
     for k, v in kwargs.items():
-      f = self.named.get(k, None)
+      f = self.named.get(k)
       if f == None:
         raise TypeError("%s got an unexpected keyword argument '%s'" %
                         (self.name, k))
@@ -232,7 +232,7 @@
     flags = 0
     for i in range(len(self.fields)):
       f = self.fields[i]
-      if f.type.is_present(values.get(f.name, None)):
+      if f.type.is_present(values.get(f.name)):
         flags |= (0x1 << i)
     for i in range(self.pack):
       codec.write_uint8((flags >> 8*i) & 0xFF)
@@ -272,7 +272,10 @@
                              for f in self.fields])
     return "%s {\n    %s\n}" % (self.qname, fields)
 
-class Segment(Node):
+class Segment:
+
+  def __init__(self):
+    self.segment_type = None
 
   def register(self, node):
     self.spec = node.spec
@@ -284,7 +287,7 @@
 
   def __init__(self, name, code, children):
     Composite.__init__(self, name, code, 0, 2, children)
-    self.segment_type = None
+    Segment.__init__(self)
     self.track = None
     self.handlers = []
 
@@ -337,11 +340,17 @@
     self.header.encode(codec, cmd)
     Instruction.encode(self, codec, cmd)
 
-class Header(Segment):
+class Header(Segment, Node):
 
   def __init__(self, children):
+    Segment.__init__(self)
+    Node.__init__(self, children)
     self.entries = []
-    Segment.__init__(self, children)
+
+  def register(self, node):
+    Segment.register(self, node)
+    self.segment_type = self.spec["segment_type.header"].value
+    Node.register(self)
 
 class Entry(Lookup):
 
@@ -356,7 +365,16 @@
   def resolve(self):
     self.type = self.lookup(self.type)
 
-class Body(Segment):
+class Body(Segment, Node):
+
+  def __init__(self, children):
+    Segment.__init__(self)
+    Node.__init__(self, children)
+
+  def register(self, node):
+    Segment.register(self, node)
+    self.segment_type = self.spec["segment_type.body"].value
+    Node.register(self)
 
   def resolve(self): pass
 

Modified: incubator/qpid/trunk/qpid/python/server010
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/server010?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/server010 (original)
+++ incubator/qpid/trunk/qpid/python/server010 Wed Mar  5 06:39:40 2008
@@ -5,6 +5,7 @@
 from qpid.util import connect, listen
 from qpid.spec010 import load
 from qpid.session import Client
+from qpid.datatypes import Message
 
 spec = load("../specs/amqp.0-10.xml")
 
@@ -26,6 +27,11 @@
 
   def queue_query(self, qq):
     return qq.type.result.type.new((qq.queue,), {})
+
+  def message_transfer(self, cmd, header, body):
+    m = Message(body)
+    m.header = header
+    self.session.message_transfer(cmd.destination, cmd.accept_mode, cmd.acquire_mode, m)
 
 server = Server()
 

Modified: incubator/qpid/trunk/qpid/python/tests/connection010.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/connection010.py?rev=633861&r1=633860&r2=633861&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/connection010.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/connection010.py Wed Mar  5 06:39:40 2008
@@ -50,11 +50,8 @@
   def queue_query(self, qq):
     return qq.type.result.type.new((qq.queue,), {})
 
-  def message_transfer(self, cmd):
-    self.queue.put(cmd)
-
-  def body(self, body):
-    self.queue.put(body)
+  def message_transfer(self, cmd, header, body):
+    self.queue.put((cmd, header, body))
 
 class ConnectionTest(TestCase):
 
@@ -88,8 +85,8 @@
     c = Connection(connect("0.0.0.0", PORT), self.spec)
     c.start(10)
 
-    ssn1 = c.session("test1")
-    ssn2 = c.session("test2")
+    ssn1 = c.session("test1", timeout=10)
+    ssn2 = c.session("test2", timeout=10)
 
     assert ssn1 == c.sessions["test1"]
     assert ssn2 == c.sessions["test2"]
@@ -110,7 +107,7 @@
     assert ssn2 not in c.attached.values()
     assert ssn2 not in c.sessions.values()
 
-    ssn = c.session("session")
+    ssn = c.session("session", timeout=10)
 
     assert ssn.channel != None
     assert ssn in c.sessions.values()
@@ -121,16 +118,17 @@
       ssn.message_transfer(d)
 
     for d in destinations:
-      cmd = self.queue.get(10)
+      cmd, header, body = self.queue.get(10)
       assert cmd.destination == d
+      assert header == None
+      assert body == None
 
     msg = Message("this is a test")
     ssn.message_transfer("four", message=msg)
-    cmd = self.queue.get(10)
+    cmd, header, body = self.queue.get(10)
     assert cmd.destination == "four"
-    body = self.queue.get(10)
-    assert body.payload == msg.body
-    assert body.last
+    assert header == None
+    assert body == msg.body
 
     qq = ssn.queue_query("asdf")
     assert qq.queue == "asdf"