You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/06/02 16:24:58 UTC

svn commit: r781044 - in /qpid/trunk/qpid/python: qpid-python-test qpid/debug.py qpid/messaging.py qpid/testlib.py qpid/tests/ qpid/tests/__init__.py qpid/tests/messaging.py

Author: rhs
Date: Tue Jun  2 14:24:57 2009
New Revision: 781044

URL: http://svn.apache.org/viewvc?rev=781044&view=rev
Log:
first commit of new messaging API and test harness

Added:
    qpid/trunk/qpid/python/qpid-python-test   (with props)
    qpid/trunk/qpid/python/qpid/debug.py
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/
    qpid/trunk/qpid/python/qpid/tests/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified:
    qpid/trunk/qpid/python/qpid/testlib.py

Added: qpid/trunk/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid-python-test?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid-python-test (added)
+++ qpid/trunk/qpid/python/qpid-python-test Tue Jun  2 14:24:57 2009
@@ -0,0 +1,450 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, logging, traceback, types
+from fnmatch import fnmatchcase as match
+from getopt import GetoptError
+from logging import getLogger, StreamHandler, Formatter, Filter, \
+    WARN, DEBUG, ERROR
+from qpid.util import URL
+
+levels = {
+  "DEBUG": DEBUG,
+  "WARN": WARN,
+  "ERROR": ERROR
+  }
+
+sorted_levels = [(v, k) for k, v in levels.items()]
+sorted_levels.sort()
+sorted_levels = [v for k, v in sorted_levels]
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+                               description="Run tests matching the specified PATTERNs.")
+parser.add_option("-l", "--list", action="store_true", default=False,
+                  help="list tests instead of executing them")
+parser.add_option("-b", "--broker", default="localhost",
+                  help="run tests against BROKER (default %default)")
+parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE")
+parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN",
+                  help="only display log messages of LEVEL or higher severity: "
+                  "%s (default %%default)" % ", ".join(sorted_levels))
+parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append",
+                  dest="log_categories", default=[],
+                  help="log only categories matching CATEGORY pattern")
+parser.add_option("-i", "--ignore", action="append", default=[],
+                  help="ignore tests matching IGNORE pattern")
+parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
+                  default=[],
+                  help="ignore tests matching patterns in IFILE")
+
+class Config:
+
+  def __init__(self):
+    self.broker = URL("localhost")
+    self.work = None
+    self.log_file = None
+    self.log_level = WARN
+    self.log_categories = []
+
+opts, args = parser.parse_args()
+
+includes = []
+excludes = ["*__*__"]
+config = Config()
+list_only = opts.list
+config.broker = URL(opts.broker)
+config.log_file = opts.log_file
+config.log_level = levels[opts.log_level.upper()]
+config.log_categories = opts.log_categories
+excludes.extend([v.strip() for v in opts.ignore])
+for v in opts.ignore_file:
+  f = open(v)
+  for line in f:
+    line = line.strip()
+    if line.startswith("#"):
+      continue
+    excludes.append(line)
+  f.close()
+
+for a in args:
+  includes.append(a.strip())
+
+if not includes:
+  includes.append("*")
+
+def included(path):
+  for p in excludes:
+    if match(path, p):
+      return False
+  for p in includes:
+    if match(path, p):
+      return True
+  return False
+
+def vt100_attrs(*attrs):
+  return "\x1B[%sm" % ";".join(map(str, attrs))
+
+vt100_reset = vt100_attrs(0)
+
+KEYWORDS = {"pass": (32,),
+            "fail": (31,),
+            "start": (34,),
+            "total": (34,)}
+
+COLORIZE = sys.stdout.isatty()
+
+def colorize_word(word, text=None):
+  if text is None:
+    text = word
+  return colorize(text, *KEYWORDS.get(word, ()))
+
+def colorize(text, *attrs):
+  if attrs and COLORIZE:
+    return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset)
+  else:
+    return text
+
+def indent(text):
+  lines = text.split("\n")
+  return "  %s" % "\n  ".join(lines)
+
+from qpid.testlib import testrunner
+testrunner.setBroker(str(config.broker))
+
+class Interceptor:
+
+  def __init__(self):
+    self.newline = False
+    self.indent = False
+    self.passthrough = True
+    self.dirty = False
+    self.last = None
+
+  def begin(self):
+    self.newline = True
+    self.indent = True
+    self.passthrough = False
+    self.dirty = False
+    self.last = None
+
+  def reset(self):
+    self.newline = False
+    self.indent = False
+    self.passthrough = True
+
+class StreamWrapper:
+
+  def __init__(self, interceptor, stream, prefix="  "):
+    self.interceptor = interceptor
+    self.stream = stream
+    self.prefix = prefix
+
+  def isatty(self):
+    return self.stream.isatty()
+
+  def write(self, s):
+    if self.interceptor.passthrough:
+      self.stream.write(s)
+      return
+
+    if s:
+      self.interceptor.dirty = True
+
+    if self.interceptor.newline:
+      self.interceptor.newline = False
+      self.stream.write(" %s\n" % colorize_word("start"))
+      self.interceptor.indent = True
+    if self.interceptor.indent:
+      self.stream.write(self.prefix)
+    if s.endswith("\n"):
+      s = s.replace("\n", "\n%s" % self.prefix)[:-2]
+      self.interceptor.indent = True
+    else:
+      s = s.replace("\n", "\n%s" % self.prefix)
+      self.interceptor.indent = False
+    self.stream.write(s)
+
+    if s:
+      self.interceptor.last = s[-1]
+
+  def flush(self):
+    self.stream.flush()
+
+interceptor = Interceptor()
+
+out_wrp = StreamWrapper(interceptor, sys.stdout)
+err_wrp = StreamWrapper(interceptor, sys.stderr)
+
+out = sys.stdout
+err = sys.stderr
+sys.stdout = out_wrp
+sys.stderr = err_wrp
+
+class PatternFilter(Filter):
+
+  def __init__(self, *patterns):
+    Filter.__init__(self, patterns)
+    self.patterns = patterns
+
+  def filter(self, record):
+    if not self.patterns:
+      return True
+    for p in self.patterns:
+      if match(record.name, p):
+        return True
+    return False
+
+root = getLogger()
+handler = StreamHandler(sys.stdout)
+filter = PatternFilter(*config.log_categories)
+handler.addFilter(filter)
+handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
+root.addHandler(handler)
+root.setLevel(WARN)
+
+log = getLogger("qpid.test")
+
+class Runner:
+
+  def __init__(self):
+    self.exceptions = []
+
+  def passed(self):
+    return not self.exceptions
+
+  def failed(self):
+    return self.exceptions
+
+  def run(self, name, phase):
+    try:
+      phase()
+    except KeyboardInterrupt:
+      raise
+    except:
+      self.exceptions.append((name, sys.exc_info()))
+
+  def status(self):
+    if self.passed():
+      return "pass"
+    else:
+      return "fail"
+
+  def print_exceptions(self):
+    for name, info in self.exceptions:
+      print "Error during %s:" % name
+      print indent("".join(traceback.format_exception(*info))).rstrip()
+
+def run_test(name, test, config):
+  patterns = filter.patterns
+  level = root.level
+  filter.patterns = config.log_categories
+  root.setLevel(config.log_level)
+
+  parts = name.split(".")
+  line = None
+  output = ""
+  for part in parts:
+    if line:
+      if len(line) + len(part) >= 71:
+        output += "%s. \\\n" % line
+        line = "    %s" % part
+      else:
+        line = "%s.%s" % (line, part)
+    else:
+      line = part
+
+  if line:
+    output += "%s %s" % (line, ((72 - len(line))*"."))
+  sys.stdout.write(output)
+  sys.stdout.flush()
+  interceptor.begin()
+  try:
+    runner = test()
+  finally:
+    interceptor.reset()
+  if interceptor.dirty:
+    if interceptor.last != "\n":
+      sys.stdout.write("\n")
+    sys.stdout.write(output)
+  print " %s" % colorize_word(runner.status())
+  if runner.failed():
+    runner.print_exceptions()
+  root.setLevel(level)
+  filter.patterns = patterns
+  return runner.passed()
+
+class FunctionTest:
+
+  def __init__(self, test):
+    self.test = test
+
+  def name(self):
+    return "%s.%s" % (self.test.__module__, self.test.__name__)
+
+  def run(self):
+    return run_test(self.name(), self._run, config)
+
+  def _run(self):
+    runner = Runner()
+    runner.run("test", lambda: self.test(config))
+    return runner
+
+  def __repr__(self):
+    return "FunctionTest(%r)" % self.test
+
+class MethodTest:
+
+  def __init__(self, cls, method):
+    self.cls = cls
+    self.method = method
+
+  def name(self):
+    return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method)
+
+  def run(self):
+    return run_test(self.name(), self._run, config)
+
+  def _run(self):
+    runner = Runner()
+    inst = self.cls(self.method)
+    test = getattr(inst, self.method)
+
+    if hasattr(inst, "configure"):
+      runner.run("configure", lambda: inst.configure(config))
+      if runner.failed(): return runner
+    if hasattr(inst, "setUp"):
+      runner.run("setup", inst.setUp)
+      if runner.failed(): return runner
+    elif hasattr(inst, "setup"):
+      runner.run("setup", inst.setup)
+      if runner.failed(): return runner
+
+    runner.run("test", test)
+
+    if hasattr(inst, "tearDown"):
+      runner.run("teardown", inst.tearDown)
+    elif hasattr(inst, "teardown"):
+      runner.run("teardown", inst.teardown)
+
+    return runner
+
+  def __repr__(self):
+    return "MethodTest(%r, %r)" % (self.cls, self.method)
+
+class PatternMatcher:
+
+  def __init__(self, *patterns):
+    self.patterns = patterns
+
+  def matches(self, name):
+    for p in self.patterns:
+      if match(name, p):
+        return True
+    return False
+
+class FunctionScanner(PatternMatcher):
+
+  def inspect(self, obj):
+    return type(obj) == types.FunctionType and self.matches(name)
+
+  def descend(self, func):
+    return; yield
+
+  def extract(self, func):
+    yield FunctionTest(func)
+
+class ClassScanner(PatternMatcher):
+
+  def inspect(self, obj):
+    return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__)
+
+  def descend(self, cls):
+    return; yield
+
+  def extract(self, cls):
+    names = dir(cls)
+    names.sort()
+    for name in names:
+      obj = getattr(cls, name)
+      t = type(obj)
+      if t == types.MethodType and name.startswith("test"):
+        yield MethodTest(cls, name)
+
+class ModuleScanner:
+
+  def inspect(self, obj):
+    return type(obj) == types.ModuleType
+
+  def descend(self, obj):
+    names = dir(obj)
+    names.sort()
+    for name in names:
+      yield getattr(obj, name)
+
+  def extract(self, obj):
+    return; yield
+
+class Harness:
+
+  def __init__(self):
+    self.scanners = [
+      ModuleScanner(),
+      ClassScanner("*Test", "*Tests", "*TestCase"),
+      FunctionScanner("test_*")
+      ]
+    self.tests = []
+    self.scanned = []
+
+  def scan(self, *roots):
+    objects = list(roots)
+
+    while objects:
+      obj = objects.pop(0)
+      for s in self.scanners:
+        if s.inspect(obj):
+          self.tests.extend(s.extract(obj))
+          for child in s.descend(obj):
+            if not (child in self.scanned or child in objects):
+              objects.append(child)
+      self.scanned.append(obj)
+
+modules = "qpid.tests", "tests", "tests_0-10"
+h = Harness()
+for name in modules:
+  m = __import__(name, fromlist=["dummy"])
+  h.scan(m)
+
+filtered = [t for t in h.tests if included(t.name())]
+passed = 0
+failed = 0
+for t in filtered:
+  if list_only:
+    print t.name()
+  else:
+    if t.run():
+      passed += 1
+    else:
+      failed += 1
+
+if not list_only:
+  print colorize("Totals:", 1), \
+      colorize_word("total", "%s tests" % len(filtered)) + ",", \
+      colorize_word("pass", "%s passed" % passed) + ",", \
+      colorize_word("fail" if failed else "pass", "%s failed" % failed)

Propchange: qpid/trunk/qpid/python/qpid-python-test
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/python/qpid/debug.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/debug.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/debug.py (added)
+++ qpid/trunk/qpid/python/qpid/debug.py Tue Jun  2 14:24:57 2009
@@ -0,0 +1,45 @@
+import traceback, time, sys
+
+from threading import RLock
+
+def stackdump(*args):
+  print args
+  code = []
+  for threadId, stack in sys._current_frames().items():
+    code.append("\n# ThreadID: %s" % threadId)
+    for filename, lineno, name, line in traceback.extract_stack(stack):
+      code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+      if line:
+        code.append("  %s" % (line.strip()))
+  print "\n".join(code)
+
+import signal
+signal.signal(signal.SIGQUIT, stackdump)
+
+#out = open("/tmp/stacks.txt", "write")
+
+class LoudLock:
+
+  def __init__(self):
+    self.lock = RLock()
+
+  def acquire(self, blocking=1):
+    import threading
+    while not self.lock.acquire(blocking=0):
+      time.sleep(1)
+      print >> out, "TRYING"
+#      print self.lock._RLock__owner, threading._active
+#      stackdump()
+      traceback.print_stack(None, None, out)
+      print >> out, "TRYING"
+    print >> out, "ACQUIRED"
+    traceback.print_stack(None, None, out)
+    print >> out, "ACQUIRED"
+    return True
+
+  def _is_owned(self):
+    return self.lock._is_owned()
+
+  def release(self):
+    self.lock.release()
+

Added: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (added)
+++ qpid/trunk/qpid/python/qpid/messaging.py Tue Jun  2 14:24:57 2009
@@ -0,0 +1,807 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+  - asynchronous send
+  - asynchronous error notification
+  - definition of the arguments for L{Session.sender} and L{Session.receiver}
+  - standard L{Message} properties
+  - L{Message} content encoding
+  - protocol negotiation/multiprotocol impl
+"""
+
+import connection, time, sys, traceback
+from codec010 import StringCodec
+from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from logging import getLogger
+from session import Client, INCOMPLETE
+from spec import SPEC
+from threading import Thread, RLock, Condition
+from util import connect
+
+log = getLogger("qpid.messaging")
+
+static = staticmethod
+
+def synchronized(meth):
+  def sync_wrapper(self, *args, **kwargs):
+    self.lock()
+    try:
+      return meth(self, *args, **kwargs)
+    finally:
+      self.unlock()
+  return sync_wrapper
+
+class Lockable(object):
+
+  def lock(self):
+    self._lock.acquire()
+
+  def unlock(self):
+    self._lock.release()
+
+  def wait(self, predicate, timeout=None):
+    passed = 0
+    start = time.time()
+    while not predicate():
+      if timeout is None:
+        # using the timed wait prevents keyboard interrupts from being
+        # blocked while waiting
+        self._condition.wait(3)
+      elif passed < timeout:
+        self._condition.wait(timeout - passed)
+      else:
+        return False
+      passed = time.time() - start
+    return True
+
+  def notify(self):
+    self._condition.notify()
+
+  def notifyAll(self):
+    self._condition.notifyAll()
+
+def default(value, default):
+  if value is None:
+    return default
+  else:
+    return value
+
+AMQP_PORT = 5672
+AMQPS_PORT = 5671
+
+class Connection(Lockable):
+
+  """
+  A Connection manages a group of L{Sessions<Session>} and connects
+  them with a remote endpoint.
+  """
+
+  @static
+  def open(host, port=None):
+    """
+    Creates an AMQP connection and connects it to the given host and port.
+
+    @type host: str
+    @param host: the name or ip address of the remote host
+    @type port: int
+    @param port: the port number of the remote host
+    @rtype: Connection
+    @return: a connected Connection
+    """
+    conn = Connection(host, port)
+    conn.connect()
+    return conn
+
+  def __init__(self, host, port=None):
+    """
+    Creates a connection. A newly created connection must be connected
+    with the Connection.connect() method before it can be started.
+
+    @type host: str
+    @param host: the name or ip address of the remote host
+    @type port: int
+    @param port: the port number of the remote host
+    @rtype: Connection
+    @return: a disconnected Connection
+    """
+    self.host = host
+    self.port = default(port, AMQP_PORT)
+    self.started = False
+    self._conn = None
+    self.id = str(uuid4())
+    self.session_counter = 0
+    self.sessions = {}
+    self._lock = RLock()
+    self._condition = Condition(self._lock)
+
+  @synchronized
+  def session(self, name=None):
+    """
+    Creates or retrieves the named session. If the name is omitted or
+    None, then a unique name is chosen based on a randomly generated
+    uuid.
+
+    @type name: str
+    @param name: the session name
+    @rtype: Session
+    @return: the named Session
+    """
+
+    if name is None:
+      name = "%s:%s" % (self.id, self.session_counter)
+      self.session_counter += 1
+    else:
+      name = "%s:%s" % (self.id, name)
+
+    if self.sessions.has_key(name):
+      return self.sessions[name]
+    else:
+      ssn = Session(self, name, self.started)
+      self.sessions[name] = ssn
+      if self._conn is not None:
+        ssn._attach()
+      return ssn
+
+  @synchronized
+  def _remove_session(self, ssn):
+    del self.sessions[ssn.name]
+
+  @synchronized
+  def connect(self):
+    """
+    Connect to the remote endpoint.
+    """
+    if self._conn is not None:
+      return
+    self._socket = connect(self.host, self.port)
+    self._conn = connection.Connection(self._socket)
+    self._conn.start()
+
+    for ssn in self.sessions.values():
+      ssn._attach()
+
+  @synchronized
+  def disconnect(self):
+    """
+    Disconnect from the remote endpoint.
+    """
+    if self._conn is not None:
+      self._conn.close()
+      self._conn = None
+    for ssn in self.sessions.values():
+      ssn._disconnected()
+
+  @synchronized
+  def connected(self):
+    """
+    Return true if the connection is connected, false otherwise.
+    """
+    return self._conn is not None
+
+  @synchronized
+  def start(self):
+    """
+    Start incoming message delivery for all sessions.
+    """
+    self.started = True
+    for ssn in self.sessions.values():
+      ssn.start()
+
+  @synchronized
+  def stop(self):
+    """
+    Stop incoming message deliveries for all sessions.
+    """
+    for ssn in self.sessions.values():
+      ssn.stop()
+    self.started = False
+
+  @synchronized
+  def close(self):
+    """
+    Close the connection and all sessions.
+    """
+    for ssn in self.sessions.values():
+      ssn.close()
+    self.disconnect()
+
+class Pattern:
+  """
+  The pattern filter matches the supplied wildcard pattern against a
+  message subject.
+  """
+
+  def __init__(self, value):
+    self.value = value
+
+  def _bind(self, ssn, exchange, queue):
+    ssn.exchange_bind(exchange=exchange, queue=queue,
+                      binding_key=self.value.replace("*", "#"))
+
+FILTER_DEFAULTS = {
+  "topic": Pattern("*")
+  }
+
+def delegate(session):
+  class Delegate(Client):
+
+    def message_transfer(self, cmd, headers, body):
+      session._message_transfer(cmd, headers, body)
+  return Delegate
+
+class Session(Lockable):
+
+  """
+  Sessions provide a linear context for sending and receiving
+  messages, and manage various Senders and Receivers.
+  """
+
+  def __init__(self, connection, name, started):
+    self.connection = connection
+    self.name = name
+    self.started = started
+    self._ssn = None
+    self.senders = []
+    self.receivers = []
+    self.closing = False
+    self.incoming = []
+    self.closed = False
+    self.unacked = []
+    self._lock = RLock()
+    self._condition = Condition(self._lock)
+    self.thread = Thread(target = self.run)
+    self.thread.setDaemon(True)
+    self.thread.start()
+
+  def __repr__(self):
+    return "<Session %s>" % self.name
+
+  def _attach(self):
+    self._ssn = self.connection._conn.session(self.name, delegate=delegate(self))
+    self._ssn.auto_sync = False
+    self._ssn.invoke_lock = self._lock
+    self._ssn.lock = self._lock
+    self._ssn.condition = self._condition
+    for link in self.senders + self.receivers:
+      link._link()
+
+  def _disconnected(self):
+    self._ssn = None
+    for link in self.senders + self.receivers:
+      link._disconnected()
+
+  @synchronized
+  def _message_transfer(self, cmd, headers, body):
+    m = Message010(body)
+    m.headers = headers
+    m.id = cmd.id
+    msg = self._decode(m)
+    rcv = self.receivers[int(cmd.destination)]
+    msg._receiver = rcv
+    log.debug("RECV [%s] %s", self, msg)
+    self.incoming.append(msg)
+    self.notifyAll()
+    return INCOMPLETE
+
+  def _decode(self, message):
+    dp = message.get("delivery_properties")
+    mp = message.get("message_properties")
+    ap = mp.application_headers
+    enc, dec = get_codec(mp.content_type)
+    content = dec(message.body)
+    msg = Message(content)
+    msg.id = mp.message_id
+    if ap is not None:
+      msg.to = ap.get("to")
+      msg.subject = ap.get("subject")
+    msg.user_id = mp.user_id
+    if mp.reply_to is not None:
+      msg.reply_to = reply_to2addr(mp.reply_to)
+    msg.correlation_id = mp.correlation_id
+    msg.properties = mp.application_headers
+    msg.content_type = mp.content_type
+    msg._transfer_id = message.id
+    return msg
+
+  def _exchange_query(self, address):
+    # XXX: auto sync hack is to avoid deadlock on future
+    result = self._ssn.exchange_query(name=address, sync=True)
+    self._ssn.sync()
+    return result.get()
+
+  @synchronized
+  def sender(self, target):
+    """
+    Creates a L{Sender} that may be used to send L{Messages<Message>}
+    to the specified target.
+
+    @type target: str
+    @param target: the target to which messages will be sent
+    @rtype: Sender
+    @return: a new Sender for the specified target
+    """
+    sender = Sender(self, len(self.senders), target)
+    self.senders.append(sender)
+    if self._ssn is not None:
+      sender._link()
+    return sender
+
+  @synchronized
+  def receiver(self, source, filter=None):
+    """
+    Creates a receiver that may be used to actively fetch or to listen
+    for the arrival of L{Messages<Message>} from the specified source.
+
+    @type source: str
+    @param source: the source of L{Messages<Message>}
+    @rtype: Receiver
+    @return: a new Receiver for the specified source
+    """
+    receiver = Receiver(self, len(self.receivers), source, filter,
+                        self.started)
+    self.receivers.append(receiver)
+    if self._ssn is not None:
+      receiver._link()
+    return receiver
+
+  def _peek(self, predicate):
+    for msg in self.incoming:
+      if predicate(msg):
+        return msg
+
+  def _pop(self, predicate):
+    i = 0
+    while i < len(self.incoming):
+      msg = self.incoming[i]
+      if predicate(msg):
+        del self.incoming[i]
+        return msg
+      else:
+        i += 1
+
+  @synchronized
+  def _get(self, predicate, timeout=None):
+    if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+                 timeout):
+      msg = self._pop(predicate)
+      if msg is not None:
+        self.unacked.append(msg)
+        log.debug("RETR [%s] %s", self, msg)
+        return msg
+    return None
+
+  @synchronized
+  def acknowledge(self, message=None):
+    """
+    Acknowledge the given L{Message}. If message is None, then all
+    unackednowledged messages on the session are acknowledged.
+
+    @type message: Message
+    @param message: the message to acknowledge or None
+    """
+    if message is None:
+      messages = self.unacked
+    else:
+      messages = [message]
+
+    ids = RangedSet(*[m._transfer_id for m in self.unacked])
+    for range in ids:
+      self._ssn.receiver._completed.add_range(range)
+    self._ssn.channel.session_completed(self._ssn.receiver._completed)
+    self._ssn.message_accept(ids, sync=True)
+    self._ssn.sync()
+
+    for m in messages:
+      try:
+        self.unacked.remove(m)
+      except ValueError:
+        pass
+
+  @synchronized
+  def start(self):
+    """
+    Start incoming message delivery for the session.
+    """
+    self.started = True
+    for rcv in self.receivers:
+      rcv.start()
+
+  @synchronized
+  def stop(self):
+    """
+    Stop incoming message delivery for the session.
+    """
+    for rcv in self.receivers:
+      rcv.stop()
+    # TODO: think about stopping individual receivers in listen mode
+    self.wait(lambda: self._peek(self._pred) is None)
+    self.started = False
+
+  def _pred(self, m):
+    return m._receiver.listener is not None
+
+  @synchronized
+  def run(self):
+    try:
+      while True:
+        msg = self._get(self._pred)
+        if msg is None:
+          break;
+        else:
+          msg._receiver.listener(msg)
+          if self._peek(self._pred) is None:
+            self.notifyAll()
+    finally:
+      self.closed = True
+      self.notifyAll()
+
+  @synchronized
+  def close(self):
+    """
+    Close the session.
+    """
+    for link in self.receivers + self.senders:
+      link.close()
+
+    self.closing = True
+    self.notifyAll()
+    self.wait(lambda: self.closed)
+    while self.thread.isAlive():
+      self.thread.join(3)
+    self.thread = None
+    self._ssn.close()
+    self._ssn = None
+    self.connection._remove_session(self)
+
+def parse_addr(address):
+  parts = address.split("/", 1)
+  if len(parts) == 1:
+    return parts[0], None
+  else:
+    return parts[0], parts[i1]
+
+def reply_to2addr(reply_to):
+  if reply_to.routing_key is None:
+    return reply_to.exchange
+  elif reply_to.exchange in (None, ""):
+    return reply_to.routing_key
+  else:
+    return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
+
+class Disconnected(Exception):
+  """
+  Exception raised when an operation is attempted that is illegal when
+  disconnected.
+  """
+  pass
+
+class Sender(Lockable):
+
+  """
+  Sends outgoing messages.
+  """
+
+  def __init__(self, session, index, target):
+    self.session = session
+    self.index = index
+    self.target = target
+    self.closed = False
+    self._ssn = None
+    self._exchange = None
+    self._routing_key = None
+    self._subject = None
+    self._lock = self.session._lock
+    self._condition = self.session._condition
+
+  def _link(self):
+    self._ssn = self.session._ssn
+    node, self._subject = parse_addr(self.target)
+    result = self.session._exchange_query(node)
+    if result.not_found:
+      # XXX: should check 'create' option
+      self._ssn.queue_declare(queue=node, durable=False, sync=True)
+      self._ssn.sync()
+      self._exchange = ""
+      self._routing_key = node
+    else:
+      self._exchange = node
+      self._routing_key = self._subject
+
+  def _disconnected(self):
+    self._ssn = None
+
+  @synchronized
+  def send(self, object):
+    """
+    Send a message. If the object passed in is of type L{unicode},
+    L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
+    L{Message} and sent. If it is of type L{Message}, it will be sent
+    directly.
+
+    @type object: unicode, str, list, dict, Message
+    @param object: the message or content to send
+    """
+
+    if self._ssn is None:
+      raise Disconnected()
+
+    if isinstance(object, Message):
+      message = object
+    else:
+      message = Message(object)
+    # XXX: what if subject is specified for a normal queue?
+    rk = message.subject if self._routing_key is None else self._routing_key
+    # XXX: do we need to query to figure out how to create the reply-to interoperably?
+    rt = self._ssn.reply_to(*parse_addr(message.reply_to)) if message.reply_to else None
+    dp = self._ssn.delivery_properties(routing_key=rk)
+    mp = self._ssn.message_properties(message_id=message.id,
+                                      user_id=message.user_id,
+                                      reply_to=rt,
+                                      correlation_id=message.correlation_id,
+                                      content_type=message.content_type,
+                                      application_headers=message.properties)
+    if message.subject is not None:
+      if mp.application_headers is None:
+        mp.application_headers = {}
+      mp.application_headers["subject"] = message.subject
+    if message.to is not None:
+      if mp.application_headers is None:
+        mp.application_headers = {}
+      mp.application_headers["to"] = message.to
+    enc, dec = get_codec(message.content_type)
+    body = enc(message.content)
+    self._ssn.message_transfer(destination=self._exchange,
+                               message=Message010(dp, mp, body),
+                               sync=True)
+    log.debug("SENT [%s] %s", self.session, message)
+    self._ssn.sync()
+
+  @synchronized
+  def close(self):
+    """
+    Close the Sender.
+    """
+    if not self.closed:
+      self.session.senders.remove(self)
+      self.closed = True
+
+class Empty(Exception):
+  """
+  Exception raised by L{Receiver.fetch} when there is no message
+  available within the alloted time.
+  """
+  pass
+
+class Receiver(Lockable):
+
+  """
+  Receives incoming messages from a remote source. Messages may be
+  actively fetched with L{fetch} or a listener may be installed with
+  L{listen}.
+  """
+
+  def __init__(self, session, index, source, filter, started):
+    self.session = session
+    self.index = index
+    self.destination = str(self.index)
+    self.source = source
+    self.filter = filter
+    self.started = started
+    self.closed = False
+    self.incoming = []
+    self.listener = None
+    self._ssn = None
+    self._queue = None
+    self._lock = self.session._lock
+    self._condition = self.session._condition
+
+  def _link(self):
+    self._ssn = self.session._ssn
+    result = self.session._exchange_query(self.source)
+    if result.not_found:
+      self._queue = self.source
+      # XXX: should check 'create' option
+      self._ssn.queue_declare(queue=self._queue, durable=False)
+    else:
+      self._queue = "%s.%s" % (self.session.name, self.destination)
+      self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True)
+      f = FILTER_DEFAULTS[result.type] if self.filter is None else self.filter
+      f._bind(self._ssn, self.source, self._queue)
+    self._ssn.message_subscribe(queue=self._queue, destination=self.destination,
+                                sync=True)
+    self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit)
+    self._ssn.sync()
+    if self.started:
+      self._start()
+
+  def _disconnected(self):
+    self._ssn = None
+
+  @synchronized
+  def listen(self, listener=None):
+    """
+    Sets the message listener for this receiver.
+
+    @type listener: callable
+    @param listener: a callable object to be notified on message arrival
+    """
+    self.listener = listener
+    if self.listener is None:
+      self._ssn.message_stop(self.destination)
+      self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL,
+                             sync=True)
+      self._ssn.sync()
+    else:
+      self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+
+  def _pred(self, msg):
+    return msg._receiver == self
+
+  @synchronized
+  def fetch(self, timeout=None):
+    """
+    Fetch and return a single message. A timeout of None will block
+    forever waiting for a message to arrive, a timeout of zero will
+    return immediately if no messages are available.
+
+    @type timeout: float
+    @param timeout: the time to wait for a message to be available
+    """
+    self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+                           0xFFFFFFFFL)
+    self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+    msg = self.session._get(self._pred, timeout=timeout)
+    if msg is None:
+      self._ssn.message_flush(self.destination)
+      self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+                             0xFFFFFFFFL, sync=True)
+      self._ssn.sync()
+      msg = self.session._get(self._pred, timeout=0)
+      if msg is None:
+        raise Empty()
+    return msg
+
+  def _start(self):
+    self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL)
+    if self.listener is not None:
+      self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+
+  @synchronized
+  def start(self):
+    """
+    Start incoming message delivery for this receiver.
+    """
+    self.started = True
+    if self._ssn is not None:
+      self._start()
+
+  def _stop(self):
+    self._ssn.message_stop(self.destination)
+
+  @synchronized
+  def stop(self):
+    """
+    Stop incoming message delivery for this receiver.
+    """
+    if self._ssn is not None:
+      self._stop()
+    self.started = False
+
+  @synchronized
+  def close(self):
+    """
+    Close the receiver.
+    """
+    if not self.closed:
+      self.closed = True
+      self._ssn.message_cancel(self.destination, sync=True)
+      self._ssn.sync()
+      self.session.receivers.remove(self)
+
+
+
+def codec(name):
+  type = SPEC.named[name]
+
+  def encode(x):
+    sc = StringCodec(SPEC)
+    type.encode(sc, x)
+    return sc.encoded
+
+  def decode(x):
+    sc = StringCodec(SPEC, x)
+    return type.decode(sc)
+
+  return encode, decode
+
+TYPE_MAPPINGS={
+  dict: "amqp/map",
+  list: "amqp/list",
+  unicode: "text/plain; charset=utf8",
+  buffer: None,
+  str: None,
+  None.__class__: None
+  }
+
+TYPE_CODEC={
+  "amqp/map": codec("map"),
+  "amqp/list": codec("list"),
+  "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+  None: (lambda x: x, lambda x: x)
+  }
+
+def get_type(content):
+  return TYPE_MAPPINGS[content.__class__]
+
+def get_codec(content_type):
+  return TYPE_CODEC[content_type]
+
+class Message:
+
+  """
+  A message consists of a standard set of fields, an application
+  defined set of properties, and some content.
+
+  @type id: str
+  @ivar id: the message id
+  @type user_id: ???
+  @ivar user_id: the user-id of the message producer
+  @type to: ???
+  @ivar to: ???
+  @type reply_to: ???
+  @ivar reply_to: ???
+  @type correlation_id: str
+  @ivar correlation_id: a correlation-id for the message
+  @type properties: dict
+  @ivar properties: application specific message properties
+  @type content_type: str
+  @ivar content_type: the content-type of the message
+  @type content: str, unicode, buffer, dict, list
+  @ivar content: the message content
+  """
+
+  def __init__(self, content=None):
+    """
+    Construct a new message with the supplied content. The
+    content-type of the message will be automatically inferred from
+    type of the content parameter.
+
+    @type content: str, unicode, buffer, dict, list
+    @param content: the message content
+    """
+    self.id = None
+    self.subject = None
+    self.user_id = None
+    self.to = None
+    self.reply_to = None
+    self.correlation_id = None
+    self.properties = {}
+    self.content_type = get_type(content)
+    self.content = content
+
+  def __repr__(self):
+    return "Message(%r)" % self.content
+
+__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
+           "Empty", "timestamp", "uuid4"]

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=781044&r1=781043&r2=781044&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Tue Jun  2 14:24:57 2009
@@ -412,7 +412,7 @@
         sock = connect(host or testrunner.host, port or testrunner.port)
         if testrunner.url.scheme == URL.AMQPS:
             sock = ssl(sock)
-        conn = Connection(sock, testrunner.spec, username=testrunner.user,
+        conn = Connection(sock, username=testrunner.user,
                           password=testrunner.password)
         conn.start(timeout=10)
         return conn

Added: qpid/trunk/qpid/python/qpid/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/__init__.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/__init__.py (added)
+++ qpid/trunk/qpid/python/qpid/tests/__init__.py Tue Jun  2 14:24:57 2009
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Test:
+
+  def __init__(self, name):
+    self.name = name
+
+  def configure(self, config):
+    self.config = config
+
+import messaging

Added: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (added)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Jun  2 14:24:57 2009
@@ -0,0 +1,402 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import time
+from qpid.tests import Test
+from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4
+from Queue import Queue, Empty as QueueEmpty
+
+class Base(Test):
+
+  def setup_connection(self):
+    return None
+
+  def setup_session(self):
+    return None
+
+  def setup_sender(self):
+    return None
+
+  def setup_receiver(self):
+    return None
+
+  def setup(self):
+    self.broker = self.config.broker
+    self.conn = self.setup_connection()
+    self.ssn = self.setup_session()
+    self.snd = self.setup_sender()
+    self.rcv = self.setup_receiver()
+
+  def teardown(self):
+    if self.conn is not None and self.conn.connected():
+      self.conn.close()
+
+  def ping(self, ssn):
+    # send a message
+    sender = ssn.sender("ping-queue")
+    content = "ping[%s]" % uuid4()
+    sender.send(content)
+    receiver = ssn.receiver("ping-queue")
+    msg = receiver.fetch(timeout=0)
+    ssn.acknowledge()
+    assert msg.content == content
+
+  def drain(self, rcv):
+    msgs = []
+    try:
+      while True:
+        msgs.append(rcv.fetch(0))
+    except Empty:
+      pass
+    return msgs
+
+class SetupTests(Base):
+
+  def testOpen(self):
+    # XXX: need to flesh out URL support/syntax
+    self.conn = Connection.open(self.broker.host, self.broker.port)
+    self.ping(self.conn.session())
+
+  def testConnect(self):
+    # XXX: need to flesh out URL support/syntax
+    self.conn = Connection(self.broker.host, self.broker.port)
+    self.conn.connect()
+    self.ping(self.conn.session())
+
+class ConnectionTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port)
+
+  def testSessionAnon(self):
+    ssn1 = self.conn.session()
+    ssn2 = self.conn.session()
+    self.ping(ssn1)
+    self.ping(ssn2)
+    assert ssn1 is not ssn2
+
+  def testSessionNamed(self):
+    ssn1 = self.conn.session("one")
+    ssn2 = self.conn.session("two")
+    self.ping(ssn1)
+    self.ping(ssn2)
+    assert ssn1 is not ssn2
+    assert ssn1 is self.conn.session("one")
+    assert ssn2 is self.conn.session("two")
+
+  def testDisconnect(self):
+    ssn = self.conn.session()
+    self.ping(ssn)
+    self.conn.disconnect()
+    import socket
+    try:
+      self.ping(ssn)
+      assert False, "ping succeeded"
+    except Disconnected:
+      # this is the expected failure when pinging on a disconnected
+      # connection
+      pass
+    self.conn.connect()
+    self.ping(ssn)
+
+  def testStart(self):
+    ssn = self.conn.session()
+    assert not ssn.started
+    self.conn.start()
+    assert ssn.started
+    ssn2 = self.conn.session()
+    assert ssn2.started
+
+  def testStop(self):
+    self.conn.start()
+    ssn = self.conn.session()
+    assert ssn.started
+    self.conn.stop()
+    assert not ssn.started
+    ssn2 = self.conn.session()
+    assert not ssn2.started
+
+  def testClose(self):
+    self.conn.close()
+    assert not self.conn.connected()
+
+class SessionTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port)
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def testSender(self):
+    snd = self.ssn.sender("test-snd-queue")
+    snd2 = self.ssn.sender(snd.target)
+    assert snd is not snd2
+    snd2.close()
+
+    content = "testSender[%s]" % uuid4()
+    snd.send(content)
+    rcv = self.ssn.receiver(snd.target)
+    msg = rcv.fetch(0)
+    assert msg.content == content
+    self.ssn.acknowledge(msg)
+
+  def testReceiver(self):
+    rcv = self.ssn.receiver("test-rcv-queue")
+    rcv2 = self.ssn.receiver(rcv.source)
+    assert rcv is not rcv2
+    rcv2.close()
+
+    content = "testReceiver[%s]" % uuid4()
+    snd = self.ssn.sender(rcv.source)
+    snd.send(content)
+    msg = rcv.fetch(0)
+    assert msg.content == content
+    self.ssn.acknowledge(msg)
+
+  def testStart(self):
+    rcv = self.ssn.receiver("test-start-queue")
+    assert not rcv.started
+    self.ssn.start()
+    assert rcv.started
+    rcv = self.ssn.receiver("test-start-queue")
+    assert rcv.started
+
+  def testStop(self):
+    self.ssn.start()
+    rcv = self.ssn.receiver("test-stop-queue")
+    assert rcv.started
+    self.ssn.stop()
+    assert not rcv.started
+    rcv = self.ssn.receiver("test-stop-queue")
+    assert not rcv.started
+
+  # XXX, we need a convenient way to assert that required queues are
+  # empty on setup, and possibly also to drain queues on teardown
+  def testAcknowledge(self):
+    # send a bunch of messages
+    snd = self.ssn.sender("test-ack-queue")
+    tid = "a"
+    contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+    for c in contents:
+      snd.send(c)
+
+    # drain the queue, verify the messages are there and then close
+    # without acking
+    rcv = self.ssn.receiver(snd.target)
+    msgs = self.drain(rcv)
+    assert contents == [m.content for m in msgs]
+    self.ssn.close()
+
+    # drain the queue again, verify that they are all the messages
+    # were requeued, and ack this time before closing
+    self.ssn = self.conn.session()
+    rcv = self.ssn.receiver("test-ack-queue")
+    msgs = self.drain(rcv)
+    assert contents == [m.content for m in msgs]
+    self.ssn.acknowledge()
+    self.ssn.close()
+
+    # drain the queue a final time and verify that the messages were
+    # dequeued
+    self.ssn = self.conn.session()
+    rcv = self.ssn.receiver("test-ack-queue")
+    msgs = self.drain(rcv)
+    assert len(msgs) == 0
+
+  def testClose(self):
+    self.ssn.close()
+    try:
+      self.ping(self.ssn)
+      assert False, "ping succeeded"
+    except Disconnected:
+      pass
+
+class ReceiverTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port)
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def setup_sender(self):
+    return self.ssn.sender("test-receiver-queue")
+
+  def setup_receiver(self):
+    return self.ssn.receiver("test-receiver-queue")
+
+  def testListen(self):
+    msgs = Queue()
+    def listener(m):
+      msgs.put(m)
+      self.ssn.acknowledge(m)
+    self.rcv.listen(listener)
+    content = "testListen[%s]" % uuid4()
+    self.snd.send(content)
+    try:
+      msg = msgs.get(timeout=3)
+      assert False, "did not expect message: %s" % msg
+    except QueueEmpty:
+      pass
+    self.rcv.start()
+    msg = msgs.get(timeout=3)
+    assert msg.content == content
+
+  def testFetch(self):
+    try:
+      msg = self.rcv.fetch(0)
+      assert False, "unexpected message: %s" % msg
+    except Empty:
+      pass
+    try:
+      start = time.time()
+      msg = self.rcv.fetch(3)
+      assert False, "unexpected message: %s" % msg
+    except Empty:
+      elapsed = time.time() - start
+      assert elapsed >= 3
+
+    content = "testListen[%s]" % uuid4()
+    for i in range(3):
+      self.snd.send(content)
+    msg = self.rcv.fetch(0)
+    assert msg.content == content
+    msg = self.rcv.fetch(3)
+    assert msg.content == content
+    msg = self.rcv.fetch()
+    assert msg.content == content
+    self.ssn.acknowledge()
+
+  # XXX: need testStart, testStop and testClose
+
+class MessageTests(Base):
+
+  def testCreateString(self):
+    m = Message("string")
+    assert m.content == "string"
+    assert m.content_type is None
+
+  def testCreateUnicode(self):
+    m = Message(u"unicode")
+    assert m.content == u"unicode"
+    assert m.content_type == "text/plain; charset=utf8"
+
+  def testCreateMap(self):
+    m = Message({})
+    assert m.content == {}
+    assert m.content_type == "amqp/map"
+
+  def testCreateList(self):
+    m = Message([])
+    assert m.content == []
+    assert m.content_type == "amqp/list"
+
+  def testContentTypeOverride(self):
+    m = Message()
+    m.content_type = "text/html; charset=utf8"
+    m.content = u"<html/>"
+    assert m.content_type == "text/html; charset=utf8"
+
+class MessageEchoTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port)
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def setup_sender(self):
+    return self.ssn.sender("test-message-echo-queue")
+
+  def setup_receiver(self):
+    return self.ssn.receiver("test-message-echo-queue")
+
+  def check(self, msg):
+    self.snd.send(msg)
+    echo = self.rcv.fetch(0)
+
+    assert msg.id == echo.id
+    assert msg.subject == echo.subject
+    assert msg.user_id == echo.user_id
+    assert msg.to == echo.to
+    assert msg.reply_to == echo.reply_to
+    assert msg.correlation_id == echo.correlation_id
+    assert msg.properties == echo.properties
+    assert msg.content_type == echo.content_type
+    assert msg.content == echo.content
+
+    self.ssn.acknowledge(echo)
+
+  def testStringContent(self):
+    self.check(Message("string"))
+
+  def testUnicodeContent(self):
+    self.check(Message(u"unicode"))
+
+
+  TEST_MAP = {"key1": "string",
+              "key2": u"unicode",
+              "key3": 3,
+              "key4": -3,
+              "key5": 3.14,
+              "key6": -3.14,
+              "key7": ["one", 2, 3.14],
+              "key8": []}
+
+  def testMapContent(self):
+    self.check(Message(MessageEchoTests.TEST_MAP))
+
+  def testListContent(self):
+    self.check(Message([]))
+    self.check(Message([1, 2, 3]))
+    self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+  def testProperties(self):
+    msg = Message()
+    msg.to = "to-address"
+    msg.subject = "subject"
+    msg.correlation_id = str(uuid4())
+    msg.properties = MessageEchoTests.TEST_MAP
+    msg.reply_to = "reply-address"
+    self.check(msg)
+
+class TestTestsXXX(Test):
+
+  def testFoo(self):
+    print "this test has output"
+
+  def testBar(self):
+    print "this test "*8
+    print "has"*10
+    print "a"*75
+    print "lot of"*10
+    print "output"*10
+
+  def testQux(self):
+    import sys
+    sys.stdout.write("this test has output with no newline")
+
+  def testQuxFail(self):
+    import sys
+    sys.stdout.write("this test has output with no newline")
+    fdsa



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org