You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/06/02 16:24:58 UTC
svn commit: r781044 - in /qpid/trunk/qpid/python: qpid-python-test
qpid/debug.py qpid/messaging.py qpid/testlib.py qpid/tests/
qpid/tests/__init__.py qpid/tests/messaging.py
Author: rhs
Date: Tue Jun 2 14:24:57 2009
New Revision: 781044
URL: http://svn.apache.org/viewvc?rev=781044&view=rev
Log:
first commit of new messaging API and test harness
Added:
qpid/trunk/qpid/python/qpid-python-test (with props)
qpid/trunk/qpid/python/qpid/debug.py
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/
qpid/trunk/qpid/python/qpid/tests/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified:
qpid/trunk/qpid/python/qpid/testlib.py
Added: qpid/trunk/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid-python-test?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid-python-test (added)
+++ qpid/trunk/qpid/python/qpid-python-test Tue Jun 2 14:24:57 2009
@@ -0,0 +1,450 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, logging, traceback, types
+from fnmatch import fnmatchcase as match
+from getopt import GetoptError
+from logging import getLogger, StreamHandler, Formatter, Filter, \
+ WARN, DEBUG, ERROR
+from qpid.util import URL
+
+levels = {
+ "DEBUG": DEBUG,
+ "WARN": WARN,
+ "ERROR": ERROR
+ }
+
+sorted_levels = [(v, k) for k, v in levels.items()]
+sorted_levels.sort()
+sorted_levels = [v for k, v in sorted_levels]
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="Run tests matching the specified PATTERNs.")
+parser.add_option("-l", "--list", action="store_true", default=False,
+ help="list tests instead of executing them")
+parser.add_option("-b", "--broker", default="localhost",
+ help="run tests against BROKER (default %default)")
+parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE")
+parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN",
+ help="only display log messages of LEVEL or higher severity: "
+ "%s (default %%default)" % ", ".join(sorted_levels))
+parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append",
+ dest="log_categories", default=[],
+ help="log only categories matching CATEGORY pattern")
+parser.add_option("-i", "--ignore", action="append", default=[],
+ help="ignore tests matching IGNORE pattern")
+parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
+ default=[],
+ help="ignore tests matching patterns in IFILE")
+
+class Config:
+
+ def __init__(self):
+ self.broker = URL("localhost")
+ self.work = None
+ self.log_file = None
+ self.log_level = WARN
+ self.log_categories = []
+
+opts, args = parser.parse_args()
+
+includes = []
+excludes = ["*__*__"]
+config = Config()
+list_only = opts.list
+config.broker = URL(opts.broker)
+config.log_file = opts.log_file
+config.log_level = levels[opts.log_level.upper()]
+config.log_categories = opts.log_categories
+excludes.extend([v.strip() for v in opts.ignore])
+for v in opts.ignore_file:
+ f = open(v)
+ for line in f:
+ line = line.strip()
+ if line.startswith("#"):
+ continue
+ excludes.append(line)
+ f.close()
+
+for a in args:
+ includes.append(a.strip())
+
+if not includes:
+ includes.append("*")
+
+def included(path):
+ for p in excludes:
+ if match(path, p):
+ return False
+ for p in includes:
+ if match(path, p):
+ return True
+ return False
+
+def vt100_attrs(*attrs):
+ return "\x1B[%sm" % ";".join(map(str, attrs))
+
+vt100_reset = vt100_attrs(0)
+
+KEYWORDS = {"pass": (32,),
+ "fail": (31,),
+ "start": (34,),
+ "total": (34,)}
+
+COLORIZE = sys.stdout.isatty()
+
+def colorize_word(word, text=None):
+ if text is None:
+ text = word
+ return colorize(text, *KEYWORDS.get(word, ()))
+
+def colorize(text, *attrs):
+ if attrs and COLORIZE:
+ return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset)
+ else:
+ return text
+
+def indent(text):
+ lines = text.split("\n")
+ return " %s" % "\n ".join(lines)
+
+from qpid.testlib import testrunner
+testrunner.setBroker(str(config.broker))
+
+class Interceptor:
+
+ def __init__(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+ self.dirty = False
+ self.last = None
+
+ def begin(self):
+ self.newline = True
+ self.indent = True
+ self.passthrough = False
+ self.dirty = False
+ self.last = None
+
+ def reset(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+
+class StreamWrapper:
+
+ def __init__(self, interceptor, stream, prefix=" "):
+ self.interceptor = interceptor
+ self.stream = stream
+ self.prefix = prefix
+
+ def isatty(self):
+ return self.stream.isatty()
+
+ def write(self, s):
+ if self.interceptor.passthrough:
+ self.stream.write(s)
+ return
+
+ if s:
+ self.interceptor.dirty = True
+
+ if self.interceptor.newline:
+ self.interceptor.newline = False
+ self.stream.write(" %s\n" % colorize_word("start"))
+ self.interceptor.indent = True
+ if self.interceptor.indent:
+ self.stream.write(self.prefix)
+ if s.endswith("\n"):
+ s = s.replace("\n", "\n%s" % self.prefix)[:-2]
+ self.interceptor.indent = True
+ else:
+ s = s.replace("\n", "\n%s" % self.prefix)
+ self.interceptor.indent = False
+ self.stream.write(s)
+
+ if s:
+ self.interceptor.last = s[-1]
+
+ def flush(self):
+ self.stream.flush()
+
+interceptor = Interceptor()
+
+out_wrp = StreamWrapper(interceptor, sys.stdout)
+err_wrp = StreamWrapper(interceptor, sys.stderr)
+
+out = sys.stdout
+err = sys.stderr
+sys.stdout = out_wrp
+sys.stderr = err_wrp
+
+class PatternFilter(Filter):
+
+ def __init__(self, *patterns):
+ Filter.__init__(self, patterns)
+ self.patterns = patterns
+
+ def filter(self, record):
+ if not self.patterns:
+ return True
+ for p in self.patterns:
+ if match(record.name, p):
+ return True
+ return False
+
+root = getLogger()
+handler = StreamHandler(sys.stdout)
+filter = PatternFilter(*config.log_categories)
+handler.addFilter(filter)
+handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
+root.addHandler(handler)
+root.setLevel(WARN)
+
+log = getLogger("qpid.test")
+
+class Runner:
+
+ def __init__(self):
+ self.exceptions = []
+
+ def passed(self):
+ return not self.exceptions
+
+ def failed(self):
+ return self.exceptions
+
+ def run(self, name, phase):
+ try:
+ phase()
+ except KeyboardInterrupt:
+ raise
+ except:
+ self.exceptions.append((name, sys.exc_info()))
+
+ def status(self):
+ if self.passed():
+ return "pass"
+ else:
+ return "fail"
+
+ def print_exceptions(self):
+ for name, info in self.exceptions:
+ print "Error during %s:" % name
+ print indent("".join(traceback.format_exception(*info))).rstrip()
+
+def run_test(name, test, config):
+ patterns = filter.patterns
+ level = root.level
+ filter.patterns = config.log_categories
+ root.setLevel(config.log_level)
+
+ parts = name.split(".")
+ line = None
+ output = ""
+ for part in parts:
+ if line:
+ if len(line) + len(part) >= 71:
+ output += "%s. \\\n" % line
+ line = " %s" % part
+ else:
+ line = "%s.%s" % (line, part)
+ else:
+ line = part
+
+ if line:
+ output += "%s %s" % (line, ((72 - len(line))*"."))
+ sys.stdout.write(output)
+ sys.stdout.flush()
+ interceptor.begin()
+ try:
+ runner = test()
+ finally:
+ interceptor.reset()
+ if interceptor.dirty:
+ if interceptor.last != "\n":
+ sys.stdout.write("\n")
+ sys.stdout.write(output)
+ print " %s" % colorize_word(runner.status())
+ if runner.failed():
+ runner.print_exceptions()
+ root.setLevel(level)
+ filter.patterns = patterns
+ return runner.passed()
+
+class FunctionTest:
+
+ def __init__(self, test):
+ self.test = test
+
+ def name(self):
+ return "%s.%s" % (self.test.__module__, self.test.__name__)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ runner.run("test", lambda: self.test(config))
+ return runner
+
+ def __repr__(self):
+ return "FunctionTest(%r)" % self.test
+
+class MethodTest:
+
+ def __init__(self, cls, method):
+ self.cls = cls
+ self.method = method
+
+ def name(self):
+ return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ inst = self.cls(self.method)
+ test = getattr(inst, self.method)
+
+ if hasattr(inst, "configure"):
+ runner.run("configure", lambda: inst.configure(config))
+ if runner.failed(): return runner
+ if hasattr(inst, "setUp"):
+ runner.run("setup", inst.setUp)
+ if runner.failed(): return runner
+ elif hasattr(inst, "setup"):
+ runner.run("setup", inst.setup)
+ if runner.failed(): return runner
+
+ runner.run("test", test)
+
+ if hasattr(inst, "tearDown"):
+ runner.run("teardown", inst.tearDown)
+ elif hasattr(inst, "teardown"):
+ runner.run("teardown", inst.teardown)
+
+ return runner
+
+ def __repr__(self):
+ return "MethodTest(%r, %r)" % (self.cls, self.method)
+
+class PatternMatcher:
+
+ def __init__(self, *patterns):
+ self.patterns = patterns
+
+ def matches(self, name):
+ for p in self.patterns:
+ if match(name, p):
+ return True
+ return False
+
+class FunctionScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) == types.FunctionType and self.matches(name)
+
+ def descend(self, func):
+ return; yield
+
+ def extract(self, func):
+ yield FunctionTest(func)
+
+class ClassScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__)
+
+ def descend(self, cls):
+ return; yield
+
+ def extract(self, cls):
+ names = dir(cls)
+ names.sort()
+ for name in names:
+ obj = getattr(cls, name)
+ t = type(obj)
+ if t == types.MethodType and name.startswith("test"):
+ yield MethodTest(cls, name)
+
+class ModuleScanner:
+
+ def inspect(self, obj):
+ return type(obj) == types.ModuleType
+
+ def descend(self, obj):
+ names = dir(obj)
+ names.sort()
+ for name in names:
+ yield getattr(obj, name)
+
+ def extract(self, obj):
+ return; yield
+
+class Harness:
+
+ def __init__(self):
+ self.scanners = [
+ ModuleScanner(),
+ ClassScanner("*Test", "*Tests", "*TestCase"),
+ FunctionScanner("test_*")
+ ]
+ self.tests = []
+ self.scanned = []
+
+ def scan(self, *roots):
+ objects = list(roots)
+
+ while objects:
+ obj = objects.pop(0)
+ for s in self.scanners:
+ if s.inspect(obj):
+ self.tests.extend(s.extract(obj))
+ for child in s.descend(obj):
+ if not (child in self.scanned or child in objects):
+ objects.append(child)
+ self.scanned.append(obj)
+
+modules = "qpid.tests", "tests", "tests_0-10"
+h = Harness()
+for name in modules:
+ m = __import__(name, fromlist=["dummy"])
+ h.scan(m)
+
+filtered = [t for t in h.tests if included(t.name())]
+passed = 0
+failed = 0
+for t in filtered:
+ if list_only:
+ print t.name()
+ else:
+ if t.run():
+ passed += 1
+ else:
+ failed += 1
+
+if not list_only:
+ print colorize("Totals:", 1), \
+ colorize_word("total", "%s tests" % len(filtered)) + ",", \
+ colorize_word("pass", "%s passed" % passed) + ",", \
+ colorize_word("fail" if failed else "pass", "%s failed" % failed)
Propchange: qpid/trunk/qpid/python/qpid-python-test
------------------------------------------------------------------------------
svn:executable = *
Added: qpid/trunk/qpid/python/qpid/debug.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/debug.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/debug.py (added)
+++ qpid/trunk/qpid/python/qpid/debug.py Tue Jun 2 14:24:57 2009
@@ -0,0 +1,45 @@
+import traceback, time, sys
+
+from threading import RLock
+
+def stackdump(*args):
+ print args
+ code = []
+ for threadId, stack in sys._current_frames().items():
+ code.append("\n# ThreadID: %s" % threadId)
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ code.append(" %s" % (line.strip()))
+ print "\n".join(code)
+
+import signal
+signal.signal(signal.SIGQUIT, stackdump)
+
+#out = open("/tmp/stacks.txt", "write")
+
+class LoudLock:
+
+ def __init__(self):
+ self.lock = RLock()
+
+ def acquire(self, blocking=1):
+ import threading
+ while not self.lock.acquire(blocking=0):
+ time.sleep(1)
+ print >> out, "TRYING"
+# print self.lock._RLock__owner, threading._active
+# stackdump()
+ traceback.print_stack(None, None, out)
+ print >> out, "TRYING"
+ print >> out, "ACQUIRED"
+ traceback.print_stack(None, None, out)
+ print >> out, "ACQUIRED"
+ return True
+
+ def _is_owned(self):
+ return self.lock._is_owned()
+
+ def release(self):
+ self.lock.release()
+
Added: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (added)
+++ qpid/trunk/qpid/python/qpid/messaging.py Tue Jun 2 14:24:57 2009
@@ -0,0 +1,807 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+ - asynchronous send
+ - asynchronous error notification
+ - definition of the arguments for L{Session.sender} and L{Session.receiver}
+ - standard L{Message} properties
+ - L{Message} content encoding
+ - protocol negotiation/multiprotocol impl
+"""
+
+import connection, time, sys, traceback
+from codec010 import StringCodec
+from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from logging import getLogger
+from session import Client, INCOMPLETE
+from spec import SPEC
+from threading import Thread, RLock, Condition
+from util import connect
+
+log = getLogger("qpid.messaging")
+
+static = staticmethod
+
+def synchronized(meth):
+ def sync_wrapper(self, *args, **kwargs):
+ self.lock()
+ try:
+ return meth(self, *args, **kwargs)
+ finally:
+ self.unlock()
+ return sync_wrapper
+
+class Lockable(object):
+
+ def lock(self):
+ self._lock.acquire()
+
+ def unlock(self):
+ self._lock.release()
+
+ def wait(self, predicate, timeout=None):
+ passed = 0
+ start = time.time()
+ while not predicate():
+ if timeout is None:
+ # using the timed wait prevents keyboard interrupts from being
+ # blocked while waiting
+ self._condition.wait(3)
+ elif passed < timeout:
+ self._condition.wait(timeout - passed)
+ else:
+ return False
+ passed = time.time() - start
+ return True
+
+ def notify(self):
+ self._condition.notify()
+
+ def notifyAll(self):
+ self._condition.notifyAll()
+
+def default(value, default):
+ if value is None:
+ return default
+ else:
+ return value
+
+AMQP_PORT = 5672
+AMQPS_PORT = 5671
+
+class Connection(Lockable):
+
+ """
+ A Connection manages a group of L{Sessions<Session>} and connects
+ them with a remote endpoint.
+ """
+
+ @static
+ def open(host, port=None):
+ """
+ Creates an AMQP connection and connects it to the given host and port.
+
+ @type host: str
+ @param host: the name or ip address of the remote host
+ @type port: int
+ @param port: the port number of the remote host
+ @rtype: Connection
+ @return: a connected Connection
+ """
+ conn = Connection(host, port)
+ conn.connect()
+ return conn
+
+ def __init__(self, host, port=None):
+ """
+ Creates a connection. A newly created connection must be connected
+ with the Connection.connect() method before it can be started.
+
+ @type host: str
+ @param host: the name or ip address of the remote host
+ @type port: int
+ @param port: the port number of the remote host
+ @rtype: Connection
+ @return: a disconnected Connection
+ """
+ self.host = host
+ self.port = default(port, AMQP_PORT)
+ self.started = False
+ self._conn = None
+ self.id = str(uuid4())
+ self.session_counter = 0
+ self.sessions = {}
+ self._lock = RLock()
+ self._condition = Condition(self._lock)
+
+ @synchronized
+ def session(self, name=None):
+ """
+ Creates or retrieves the named session. If the name is omitted or
+ None, then a unique name is chosen based on a randomly generated
+ uuid.
+
+ @type name: str
+ @param name: the session name
+ @rtype: Session
+ @return: the named Session
+ """
+
+ if name is None:
+ name = "%s:%s" % (self.id, self.session_counter)
+ self.session_counter += 1
+ else:
+ name = "%s:%s" % (self.id, name)
+
+ if self.sessions.has_key(name):
+ return self.sessions[name]
+ else:
+ ssn = Session(self, name, self.started)
+ self.sessions[name] = ssn
+ if self._conn is not None:
+ ssn._attach()
+ return ssn
+
+ @synchronized
+ def _remove_session(self, ssn):
+ del self.sessions[ssn.name]
+
+ @synchronized
+ def connect(self):
+ """
+ Connect to the remote endpoint.
+ """
+ if self._conn is not None:
+ return
+ self._socket = connect(self.host, self.port)
+ self._conn = connection.Connection(self._socket)
+ self._conn.start()
+
+ for ssn in self.sessions.values():
+ ssn._attach()
+
+ @synchronized
+ def disconnect(self):
+ """
+ Disconnect from the remote endpoint.
+ """
+ if self._conn is not None:
+ self._conn.close()
+ self._conn = None
+ for ssn in self.sessions.values():
+ ssn._disconnected()
+
+ @synchronized
+ def connected(self):
+ """
+ Return true if the connection is connected, false otherwise.
+ """
+ return self._conn is not None
+
+ @synchronized
+ def start(self):
+ """
+ Start incoming message delivery for all sessions.
+ """
+ self.started = True
+ for ssn in self.sessions.values():
+ ssn.start()
+
+ @synchronized
+ def stop(self):
+ """
+ Stop incoming message deliveries for all sessions.
+ """
+ for ssn in self.sessions.values():
+ ssn.stop()
+ self.started = False
+
+ @synchronized
+ def close(self):
+ """
+ Close the connection and all sessions.
+ """
+ for ssn in self.sessions.values():
+ ssn.close()
+ self.disconnect()
+
+class Pattern:
+ """
+ The pattern filter matches the supplied wildcard pattern against a
+ message subject.
+ """
+
+ def __init__(self, value):
+ self.value = value
+
+ def _bind(self, ssn, exchange, queue):
+ ssn.exchange_bind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#"))
+
+FILTER_DEFAULTS = {
+ "topic": Pattern("*")
+ }
+
+def delegate(session):
+ class Delegate(Client):
+
+ def message_transfer(self, cmd, headers, body):
+ session._message_transfer(cmd, headers, body)
+ return Delegate
+
+class Session(Lockable):
+
+ """
+ Sessions provide a linear context for sending and receiving
+ messages, and manage various Senders and Receivers.
+ """
+
+ def __init__(self, connection, name, started):
+ self.connection = connection
+ self.name = name
+ self.started = started
+ self._ssn = None
+ self.senders = []
+ self.receivers = []
+ self.closing = False
+ self.incoming = []
+ self.closed = False
+ self.unacked = []
+ self._lock = RLock()
+ self._condition = Condition(self._lock)
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def __repr__(self):
+ return "<Session %s>" % self.name
+
+ def _attach(self):
+ self._ssn = self.connection._conn.session(self.name, delegate=delegate(self))
+ self._ssn.auto_sync = False
+ self._ssn.invoke_lock = self._lock
+ self._ssn.lock = self._lock
+ self._ssn.condition = self._condition
+ for link in self.senders + self.receivers:
+ link._link()
+
+ def _disconnected(self):
+ self._ssn = None
+ for link in self.senders + self.receivers:
+ link._disconnected()
+
+ @synchronized
+ def _message_transfer(self, cmd, headers, body):
+ m = Message010(body)
+ m.headers = headers
+ m.id = cmd.id
+ msg = self._decode(m)
+ rcv = self.receivers[int(cmd.destination)]
+ msg._receiver = rcv
+ log.debug("RECV [%s] %s", self, msg)
+ self.incoming.append(msg)
+ self.notifyAll()
+ return INCOMPLETE
+
+ def _decode(self, message):
+ dp = message.get("delivery_properties")
+ mp = message.get("message_properties")
+ ap = mp.application_headers
+ enc, dec = get_codec(mp.content_type)
+ content = dec(message.body)
+ msg = Message(content)
+ msg.id = mp.message_id
+ if ap is not None:
+ msg.to = ap.get("to")
+ msg.subject = ap.get("subject")
+ msg.user_id = mp.user_id
+ if mp.reply_to is not None:
+ msg.reply_to = reply_to2addr(mp.reply_to)
+ msg.correlation_id = mp.correlation_id
+ msg.properties = mp.application_headers
+ msg.content_type = mp.content_type
+ msg._transfer_id = message.id
+ return msg
+
+ def _exchange_query(self, address):
+ # XXX: auto sync hack is to avoid deadlock on future
+ result = self._ssn.exchange_query(name=address, sync=True)
+ self._ssn.sync()
+ return result.get()
+
+ @synchronized
+ def sender(self, target):
+ """
+ Creates a L{Sender} that may be used to send L{Messages<Message>}
+ to the specified target.
+
+ @type target: str
+ @param target: the target to which messages will be sent
+ @rtype: Sender
+ @return: a new Sender for the specified target
+ """
+ sender = Sender(self, len(self.senders), target)
+ self.senders.append(sender)
+ if self._ssn is not None:
+ sender._link()
+ return sender
+
+ @synchronized
+ def receiver(self, source, filter=None):
+ """
+ Creates a receiver that may be used to actively fetch or to listen
+ for the arrival of L{Messages<Message>} from the specified source.
+
+ @type source: str
+ @param source: the source of L{Messages<Message>}
+ @rtype: Receiver
+ @return: a new Receiver for the specified source
+ """
+ receiver = Receiver(self, len(self.receivers), source, filter,
+ self.started)
+ self.receivers.append(receiver)
+ if self._ssn is not None:
+ receiver._link()
+ return receiver
+
+ def _peek(self, predicate):
+ for msg in self.incoming:
+ if predicate(msg):
+ return msg
+
+ def _pop(self, predicate):
+ i = 0
+ while i < len(self.incoming):
+ msg = self.incoming[i]
+ if predicate(msg):
+ del self.incoming[i]
+ return msg
+ else:
+ i += 1
+
+ @synchronized
+ def _get(self, predicate, timeout=None):
+ if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
+ msg = self._pop(predicate)
+ if msg is not None:
+ self.unacked.append(msg)
+ log.debug("RETR [%s] %s", self, msg)
+ return msg
+ return None
+
+ @synchronized
+ def acknowledge(self, message=None):
+ """
+ Acknowledge the given L{Message}. If message is None, then all
+ unackednowledged messages on the session are acknowledged.
+
+ @type message: Message
+ @param message: the message to acknowledge or None
+ """
+ if message is None:
+ messages = self.unacked
+ else:
+ messages = [message]
+
+ ids = RangedSet(*[m._transfer_id for m in self.unacked])
+ for range in ids:
+ self._ssn.receiver._completed.add_range(range)
+ self._ssn.channel.session_completed(self._ssn.receiver._completed)
+ self._ssn.message_accept(ids, sync=True)
+ self._ssn.sync()
+
+ for m in messages:
+ try:
+ self.unacked.remove(m)
+ except ValueError:
+ pass
+
+ @synchronized
+ def start(self):
+ """
+ Start incoming message delivery for the session.
+ """
+ self.started = True
+ for rcv in self.receivers:
+ rcv.start()
+
+ @synchronized
+ def stop(self):
+ """
+ Stop incoming message delivery for the session.
+ """
+ for rcv in self.receivers:
+ rcv.stop()
+ # TODO: think about stopping individual receivers in listen mode
+ self.wait(lambda: self._peek(self._pred) is None)
+ self.started = False
+
+ def _pred(self, m):
+ return m._receiver.listener is not None
+
+ @synchronized
+ def run(self):
+ try:
+ while True:
+ msg = self._get(self._pred)
+ if msg is None:
+ break;
+ else:
+ msg._receiver.listener(msg)
+ if self._peek(self._pred) is None:
+ self.notifyAll()
+ finally:
+ self.closed = True
+ self.notifyAll()
+
+ @synchronized
+ def close(self):
+ """
+ Close the session.
+ """
+ for link in self.receivers + self.senders:
+ link.close()
+
+ self.closing = True
+ self.notifyAll()
+ self.wait(lambda: self.closed)
+ while self.thread.isAlive():
+ self.thread.join(3)
+ self.thread = None
+ self._ssn.close()
+ self._ssn = None
+ self.connection._remove_session(self)
+
+def parse_addr(address):
+ parts = address.split("/", 1)
+ if len(parts) == 1:
+ return parts[0], None
+ else:
+ return parts[0], parts[i1]
+
+def reply_to2addr(reply_to):
+ if reply_to.routing_key is None:
+ return reply_to.exchange
+ elif reply_to.exchange in (None, ""):
+ return reply_to.routing_key
+ else:
+ return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
+
+class Disconnected(Exception):
+ """
+ Exception raised when an operation is attempted that is illegal when
+ disconnected.
+ """
+ pass
+
+class Sender(Lockable):
+
+ """
+ Sends outgoing messages.
+ """
+
+ def __init__(self, session, index, target):
+ self.session = session
+ self.index = index
+ self.target = target
+ self.closed = False
+ self._ssn = None
+ self._exchange = None
+ self._routing_key = None
+ self._subject = None
+ self._lock = self.session._lock
+ self._condition = self.session._condition
+
+ def _link(self):
+ self._ssn = self.session._ssn
+ node, self._subject = parse_addr(self.target)
+ result = self.session._exchange_query(node)
+ if result.not_found:
+ # XXX: should check 'create' option
+ self._ssn.queue_declare(queue=node, durable=False, sync=True)
+ self._ssn.sync()
+ self._exchange = ""
+ self._routing_key = node
+ else:
+ self._exchange = node
+ self._routing_key = self._subject
+
+ def _disconnected(self):
+ self._ssn = None
+
+ @synchronized
+ def send(self, object):
+ """
+ Send a message. If the object passed in is of type L{unicode},
+ L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
+ L{Message} and sent. If it is of type L{Message}, it will be sent
+ directly.
+
+ @type object: unicode, str, list, dict, Message
+ @param object: the message or content to send
+ """
+
+ if self._ssn is None:
+ raise Disconnected()
+
+ if isinstance(object, Message):
+ message = object
+ else:
+ message = Message(object)
+ # XXX: what if subject is specified for a normal queue?
+ rk = message.subject if self._routing_key is None else self._routing_key
+ # XXX: do we need to query to figure out how to create the reply-to interoperably?
+ rt = self._ssn.reply_to(*parse_addr(message.reply_to)) if message.reply_to else None
+ dp = self._ssn.delivery_properties(routing_key=rk)
+ mp = self._ssn.message_properties(message_id=message.id,
+ user_id=message.user_id,
+ reply_to=rt,
+ correlation_id=message.correlation_id,
+ content_type=message.content_type,
+ application_headers=message.properties)
+ if message.subject is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["subject"] = message.subject
+ if message.to is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["to"] = message.to
+ enc, dec = get_codec(message.content_type)
+ body = enc(message.content)
+ self._ssn.message_transfer(destination=self._exchange,
+ message=Message010(dp, mp, body),
+ sync=True)
+ log.debug("SENT [%s] %s", self.session, message)
+ self._ssn.sync()
+
+ @synchronized
+ def close(self):
+ """
+ Close the Sender.
+ """
+ if not self.closed:
+ self.session.senders.remove(self)
+ self.closed = True
+
+class Empty(Exception):
+ """
+ Exception raised by L{Receiver.fetch} when there is no message
+ available within the alloted time.
+ """
+ pass
+
+class Receiver(Lockable):
+
+ """
+ Receives incoming messages from a remote source. Messages may be
+ actively fetched with L{fetch} or a listener may be installed with
+ L{listen}.
+ """
+
+ def __init__(self, session, index, source, filter, started):
+ self.session = session
+ self.index = index
+ self.destination = str(self.index)
+ self.source = source
+ self.filter = filter
+ self.started = started
+ self.closed = False
+ self.incoming = []
+ self.listener = None
+ self._ssn = None
+ self._queue = None
+ self._lock = self.session._lock
+ self._condition = self.session._condition
+
+ def _link(self):
+ self._ssn = self.session._ssn
+ result = self.session._exchange_query(self.source)
+ if result.not_found:
+ self._queue = self.source
+ # XXX: should check 'create' option
+ self._ssn.queue_declare(queue=self._queue, durable=False)
+ else:
+ self._queue = "%s.%s" % (self.session.name, self.destination)
+ self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True)
+ f = FILTER_DEFAULTS[result.type] if self.filter is None else self.filter
+ f._bind(self._ssn, self.source, self._queue)
+ self._ssn.message_subscribe(queue=self._queue, destination=self.destination,
+ sync=True)
+ self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit)
+ self._ssn.sync()
+ if self.started:
+ self._start()
+
+ def _disconnected(self):
+ self._ssn = None
+
+ @synchronized
+ def listen(self, listener=None):
+ """
+ Sets the message listener for this receiver.
+
+ @type listener: callable
+ @param listener: a callable object to be notified on message arrival
+ """
+ self.listener = listener
+ if self.listener is None:
+ self._ssn.message_stop(self.destination)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL,
+ sync=True)
+ self._ssn.sync()
+ else:
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+
+ def _pred(self, msg):
+ return msg._receiver == self
+
+ @synchronized
+ def fetch(self, timeout=None):
+ """
+ Fetch and return a single message. A timeout of None will block
+ forever waiting for a message to arrive, a timeout of zero will
+ return immediately if no messages are available.
+
+ @type timeout: float
+ @param timeout: the time to wait for a message to be available
+ """
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+ 0xFFFFFFFFL)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+ msg = self.session._get(self._pred, timeout=timeout)
+ if msg is None:
+ self._ssn.message_flush(self.destination)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+ 0xFFFFFFFFL, sync=True)
+ self._ssn.sync()
+ msg = self.session._get(self._pred, timeout=0)
+ if msg is None:
+ raise Empty()
+ return msg
+
+ def _start(self):
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL)
+ if self.listener is not None:
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+
+ @synchronized
+ def start(self):
+ """
+ Start incoming message delivery for this receiver.
+ """
+ self.started = True
+ if self._ssn is not None:
+ self._start()
+
+ def _stop(self):
+ self._ssn.message_stop(self.destination)
+
+ @synchronized
+ def stop(self):
+ """
+ Stop incoming message delivery for this receiver.
+ """
+ if self._ssn is not None:
+ self._stop()
+ self.started = False
+
+ @synchronized
+ def close(self):
+ """
+ Close the receiver.
+ """
+ if not self.closed:
+ self.closed = True
+ self._ssn.message_cancel(self.destination, sync=True)
+ self._ssn.sync()
+ self.session.receivers.remove(self)
+
+
+
+def codec(name):
+ type = SPEC.named[name]
+
+ def encode(x):
+ sc = StringCodec(SPEC)
+ type.encode(sc, x)
+ return sc.encoded
+
+ def decode(x):
+ sc = StringCodec(SPEC, x)
+ return type.decode(sc)
+
+ return encode, decode
+
+TYPE_MAPPINGS={
+ dict: "amqp/map",
+ list: "amqp/list",
+ unicode: "text/plain; charset=utf8",
+ buffer: None,
+ str: None,
+ None.__class__: None
+ }
+
+TYPE_CODEC={
+ "amqp/map": codec("map"),
+ "amqp/list": codec("list"),
+ "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")),
+ None: (lambda x: x, lambda x: x)
+ }
+
+def get_type(content):
+ return TYPE_MAPPINGS[content.__class__]
+
+def get_codec(content_type):
+ return TYPE_CODEC[content_type]
+
+class Message:
+
+ """
+ A message consists of a standard set of fields, an application
+ defined set of properties, and some content.
+
+ @type id: str
+ @ivar id: the message id
+ @type user_id: ???
+ @ivar user_id: the user-id of the message producer
+ @type to: ???
+ @ivar to: ???
+ @type reply_to: ???
+ @ivar reply_to: ???
+ @type correlation_id: str
+ @ivar correlation_id: a correlation-id for the message
+ @type properties: dict
+ @ivar properties: application specific message properties
+ @type content_type: str
+ @ivar content_type: the content-type of the message
+ @type content: str, unicode, buffer, dict, list
+ @ivar content: the message content
+ """
+
+ def __init__(self, content=None):
+ """
+ Construct a new message with the supplied content. The
+ content-type of the message will be automatically inferred from
+ type of the content parameter.
+
+ @type content: str, unicode, buffer, dict, list
+ @param content: the message content
+ """
+ self.id = None
+ self.subject = None
+ self.user_id = None
+ self.to = None
+ self.reply_to = None
+ self.correlation_id = None
+ self.properties = {}
+ self.content_type = get_type(content)
+ self.content = content
+
+ def __repr__(self):
+ return "Message(%r)" % self.content
+
+__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
+ "Empty", "timestamp", "uuid4"]
Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=781044&r1=781043&r2=781044&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Tue Jun 2 14:24:57 2009
@@ -412,7 +412,7 @@
sock = connect(host or testrunner.host, port or testrunner.port)
if testrunner.url.scheme == URL.AMQPS:
sock = ssl(sock)
- conn = Connection(sock, testrunner.spec, username=testrunner.user,
+ conn = Connection(sock, username=testrunner.user,
password=testrunner.password)
conn.start(timeout=10)
return conn
Added: qpid/trunk/qpid/python/qpid/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/__init__.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/__init__.py (added)
+++ qpid/trunk/qpid/python/qpid/tests/__init__.py Tue Jun 2 14:24:57 2009
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Test:
+
+ def __init__(self, name):
+ self.name = name
+
+ def configure(self, config):
+ self.config = config
+
+import messaging
Added: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=781044&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (added)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Jun 2 14:24:57 2009
@@ -0,0 +1,402 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import time
+from qpid.tests import Test
+from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4
+from Queue import Queue, Empty as QueueEmpty
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.broker = self.config.broker
+ self.conn = self.setup_connection()
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.connected():
+ self.conn.close()
+
+ def ping(self, ssn):
+ # send a message
+ sender = ssn.sender("ping-queue")
+ content = "ping[%s]" % uuid4()
+ sender.send(content)
+ receiver = ssn.receiver("ping-queue")
+ msg = receiver.fetch(timeout=0)
+ ssn.acknowledge()
+ assert msg.content == content
+
+ def drain(self, rcv):
+ msgs = []
+ try:
+ while True:
+ msgs.append(rcv.fetch(0))
+ except Empty:
+ pass
+ return msgs
+
+class SetupTests(Base):
+
+ def testOpen(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection.open(self.broker.host, self.broker.port)
+ self.ping(self.conn.session())
+
+ def testConnect(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection(self.broker.host, self.broker.port)
+ self.conn.connect()
+ self.ping(self.conn.session())
+
+class ConnectionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def testSessionAnon(self):
+ ssn1 = self.conn.session()
+ ssn2 = self.conn.session()
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+
+ def testSessionNamed(self):
+ ssn1 = self.conn.session("one")
+ ssn2 = self.conn.session("two")
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+ assert ssn1 is self.conn.session("one")
+ assert ssn2 is self.conn.session("two")
+
+ def testDisconnect(self):
+ ssn = self.conn.session()
+ self.ping(ssn)
+ self.conn.disconnect()
+ import socket
+ try:
+ self.ping(ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ # this is the expected failure when pinging on a disconnected
+ # connection
+ pass
+ self.conn.connect()
+ self.ping(ssn)
+
+ def testStart(self):
+ ssn = self.conn.session()
+ assert not ssn.started
+ self.conn.start()
+ assert ssn.started
+ ssn2 = self.conn.session()
+ assert ssn2.started
+
+ def testStop(self):
+ self.conn.start()
+ ssn = self.conn.session()
+ assert ssn.started
+ self.conn.stop()
+ assert not ssn.started
+ ssn2 = self.conn.session()
+ assert not ssn2.started
+
+ def testClose(self):
+ self.conn.close()
+ assert not self.conn.connected()
+
+class SessionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testSender(self):
+ snd = self.ssn.sender("test-snd-queue")
+ snd2 = self.ssn.sender(snd.target)
+ assert snd is not snd2
+ snd2.close()
+
+ content = "testSender[%s]" % uuid4()
+ snd.send(content)
+ rcv = self.ssn.receiver(snd.target)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testReceiver(self):
+ rcv = self.ssn.receiver("test-rcv-queue")
+ rcv2 = self.ssn.receiver(rcv.source)
+ assert rcv is not rcv2
+ rcv2.close()
+
+ content = "testReceiver[%s]" % uuid4()
+ snd = self.ssn.sender(rcv.source)
+ snd.send(content)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testStart(self):
+ rcv = self.ssn.receiver("test-start-queue")
+ assert not rcv.started
+ self.ssn.start()
+ assert rcv.started
+ rcv = self.ssn.receiver("test-start-queue")
+ assert rcv.started
+
+ def testStop(self):
+ self.ssn.start()
+ rcv = self.ssn.receiver("test-stop-queue")
+ assert rcv.started
+ self.ssn.stop()
+ assert not rcv.started
+ rcv = self.ssn.receiver("test-stop-queue")
+ assert not rcv.started
+
+ # XXX, we need a convenient way to assert that required queues are
+ # empty on setup, and possibly also to drain queues on teardown
+ def testAcknowledge(self):
+ # send a bunch of messages
+ snd = self.ssn.sender("test-ack-queue")
+ tid = "a"
+ contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ for c in contents:
+ snd.send(c)
+
+ # drain the queue, verify the messages are there and then close
+ # without acking
+ rcv = self.ssn.receiver(snd.target)
+ msgs = self.drain(rcv)
+ assert contents == [m.content for m in msgs]
+ self.ssn.close()
+
+ # drain the queue again, verify that they are all the messages
+ # were requeued, and ack this time before closing
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver("test-ack-queue")
+ msgs = self.drain(rcv)
+ assert contents == [m.content for m in msgs]
+ self.ssn.acknowledge()
+ self.ssn.close()
+
+ # drain the queue a final time and verify that the messages were
+ # dequeued
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver("test-ack-queue")
+ msgs = self.drain(rcv)
+ assert len(msgs) == 0
+
+ def testClose(self):
+ self.ssn.close()
+ try:
+ self.ping(self.ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ pass
+
+class ReceiverTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("test-receiver-queue")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("test-receiver-queue")
+
+ def testListen(self):
+ msgs = Queue()
+ def listener(m):
+ msgs.put(m)
+ self.ssn.acknowledge(m)
+ self.rcv.listen(listener)
+ content = "testListen[%s]" % uuid4()
+ self.snd.send(content)
+ try:
+ msg = msgs.get(timeout=3)
+ assert False, "did not expect message: %s" % msg
+ except QueueEmpty:
+ pass
+ self.rcv.start()
+ msg = msgs.get(timeout=3)
+ assert msg.content == content
+
+ def testFetch(self):
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ pass
+ try:
+ start = time.time()
+ msg = self.rcv.fetch(3)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ elapsed = time.time() - start
+ assert elapsed >= 3
+
+ content = "testListen[%s]" % uuid4()
+ for i in range(3):
+ self.snd.send(content)
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+ msg = self.rcv.fetch(3)
+ assert msg.content == content
+ msg = self.rcv.fetch()
+ assert msg.content == content
+ self.ssn.acknowledge()
+
+ # XXX: need testStart, testStop and testClose
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain; charset=utf8"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("test-message-echo-queue")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("test-message-echo-queue")
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+
+ assert msg.id == echo.id
+ assert msg.subject == echo.subject
+ assert msg.user_id == echo.user_id
+ assert msg.to == echo.to
+ assert msg.reply_to == echo.reply_to
+ assert msg.correlation_id == echo.correlation_id
+ assert msg.properties == echo.properties
+ assert msg.content_type == echo.content_type
+ assert msg.content == echo.content
+
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": []}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.to = "to-address"
+ msg.subject = "subject"
+ msg.correlation_id = str(uuid4())
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)
+
+class TestTestsXXX(Test):
+
+ def testFoo(self):
+ print "this test has output"
+
+ def testBar(self):
+ print "this test "*8
+ print "has"*10
+ print "a"*75
+ print "lot of"*10
+ print "output"*10
+
+ def testQux(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+
+ def testQuxFail(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+ fdsa
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org