You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC

svn commit: r906961 [1/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/ python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/ tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/ tests/src/py/qpid_tests/broker_...

Author: rhs
Date: Fri Feb  5 15:08:44 2010
New Revision: 906961

URL: http://svn.apache.org/viewvc?rev=906961&view=rev
Log:
moved protocol tests from qpid/python to qpid/tests

Added:
    qpid/trunk/qpid/tests/
    qpid/trunk/qpid/tests/src/
    qpid/trunk/qpid/tests/src/py/
    qpid/trunk/qpid/tests/src/py/qpid_tests/
    qpid/trunk/qpid/tests/src/py/qpid_tests/__init__.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py   (with props)
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py   (with props)
Removed:
    qpid/trunk/qpid/python/tests_0-10/
    qpid/trunk/qpid/python/tests_0-8/
    qpid/trunk/qpid/python/tests_0-9/
Modified:
    qpid/trunk/qpid/cpp/src/tests/python_tests
    qpid/trunk/qpid/cpp/src/tests/run_acl_tests
    qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
    qpid/trunk/qpid/python/Makefile
    qpid/trunk/qpid/python/qpid-python-test

Modified: qpid/trunk/qpid/cpp/src/tests/python_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/python_tests?rev=906961&r1=906960&r2=906961&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/python_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/python_tests Fri Feb  5 15:08:44 2010
@@ -26,4 +26,4 @@
 PYTHON_TESTS=${PYTHON_TESTS:-$*}
 FAILING=${FAILING:-/dev/null}
 
-python $QPID_PYTHON_TEST -b localhost:$QPID_PORT -I $FAILING $PYTHON_TESTS || exit 1
+python $QPID_PYTHON_TEST -m qpid_tests.broker_0_10 -m qpid.tests -b localhost:$QPID_PORT -I $FAILING $PYTHON_TESTS || exit 1

Modified: qpid/trunk/qpid/cpp/src/tests/run_acl_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_acl_tests?rev=906961&r1=906960&r2=906961&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_acl_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_acl_tests Fri Feb  5 15:08:44 2010
@@ -53,8 +53,6 @@
     cp $srcdir/policy.acl $DATA_DIR
     start_brokers
     echo "Running acl tests using brokers on ports $LOCAL_PORT"
-    PYTHONPATH=$PYTHON_DIR:$srcdir
-    export PYTHONPATH
     $QPID_PYTHON_TEST -b localhost:$LOCAL_PORT -m acl || EXITCODE=1
     stop_brokers || EXITCODE=1
     test_loading_acl_from_absolute_path || EXITCODE=1

Modified: qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_env.sh.in?rev=906961&r1=906960&r2=906961&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_env.sh.in Fri Feb  5 15:08:44 2010
@@ -34,8 +34,10 @@
     export PYTHON_DIR=$top_srcdir/../python
     export QPID_PYTHON_TEST=$PYTHON_DIR/qpid-python-test
 fi
+export QPID_TESTS=$top_srcdir/../tests
+export QPID_TESTS_PY=$QPID_TESTS/src/py
 export PYTHON_COMMANDS=$PYTHON_DIR/commands
-export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$PYTHONPATH
+export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$PYTHONPATH
 export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
 export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
 export QPID_CLUSTER_EXEC=$PYTHON_COMMANDS/qpid-cluster

Modified: qpid/trunk/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/Makefile?rev=906961&r1=906960&r2=906961&view=diff
==============================================================================
--- qpid/trunk/qpid/python/Makefile (original)
+++ qpid/trunk/qpid/python/Makefile Fri Feb  5 15:08:44 2010
@@ -31,7 +31,7 @@
 AMQP_SPEC_DIR=$(PWD)/$(DATA_DIR)/amqp
 endif
 
-DIRS=qmf qpid mllib models examples tests tests_0-8 tests_0-9 tests_0-10
+DIRS=qmf qpid mllib models examples tests
 SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py
 BUILD=build
 TARGETS=$(SRCS:%.py=$(BUILD)/%.py)
@@ -79,18 +79,6 @@
 	install -pm 0644 $(BUILD)/tests/*.* $(PYTHON_LIB)/tests
 	$(PYCC) $(PYTHON_LIB)/tests
 
-	install -d $(PYTHON_LIB)/tests_0-8
-	install -pm 0644 $(BUILD)/tests_0-8/*.* $(PYTHON_LIB)/tests_0-8
-	$(PYCC) $(PYTHON_LIB)/tests_0-8
-
-	install -d $(PYTHON_LIB)/tests_0-9
-	install -pm 0644 $(BUILD)/tests_0-9/*.* $(PYTHON_LIB)/tests_0-9
-	$(PYCC) $(PYTHON_LIB)/tests_0-9
-
-	install -d $(PYTHON_LIB)/tests_0-10
-	install -pm 0644 $(BUILD)/tests_0-10/*.* $(PYTHON_LIB)/tests_0-10
-	$(PYCC) $(PYTHON_LIB)/tests_0-10
-
 	install -d $(EXEC_PREFIX)
 	install -pm 0755 qpid-python-test commands/* $(EXEC_PREFIX)
 

Modified: qpid/trunk/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid-python-test?rev=906961&r1=906960&r2=906961&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid-python-test (original)
+++ qpid/trunk/qpid/python/qpid-python-test Fri Feb  5 15:08:44 2010
@@ -107,7 +107,7 @@
   if opts.modules:
     includes.append("*")
   else:
-    includes.extend(["qpid.tests.*", "tests.*", "tests_0-10.*"])
+    includes.extend(["qpid.tests.*", "tests.*"])
 
 def is_ignored(path):
   for p in excludes:
@@ -512,7 +512,7 @@
 
 modules = opts.modules
 if not modules:
-  modules.extend(["qpid.tests", "tests", "tests_0-8", "tests_0-9", "tests_0-10"])
+  modules.extend(["qpid.tests", "tests"])
 h = Harness()
 for name in modules:
   m = __import__(name, None, None, ["dummy"])

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/__init__.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/__init__.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/__init__.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import broker_0_10, broker_0_9, broker_0_8

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Feb  5 15:08:44 2010
@@ -0,0 +1 @@
+*.pyc

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,31 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from alternate_exchange import *
+from broker import *
+from dtx import *
+from example import *
+from exchange import *
+from management import *
+from message import *
+from query import *
+from queue import *
+from tx import *

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import traceback
+from qpid.queue import Empty
+from qpid.datatypes import Message
+from qpid.testlib import TestBase010
+from qpid.session import SessionException
+
+class AlternateExchangeTests(TestBase010):
+    """
+    Tests for the new mechanism for message returns introduced in 0-10
+    and available in 0-9 for preview
+    """
+
+    def test_unroutable(self):
+        """
+        Test that unroutable messages are delivered to the alternate-exchange if specified
+        """
+        session = self.session
+        #create an exchange with an alternate defined
+        session.exchange_declare(exchange="secondary", type="fanout")
+        session.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
+
+        #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
+        session.queue_declare(queue="returns", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="returns", exchange="secondary")
+        session.message_subscribe(destination="a", queue="returns")
+        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        returned = session.incoming("a")
+
+        #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
+        session.queue_declare(queue="processed", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="processed", exchange="primary", binding_key="my-key")
+        session.message_subscribe(destination="b", queue="processed")
+        session.message_flow(destination="b", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="b", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        processed = session.incoming("b")
+
+        #publish to the primary exchange
+        #...one message that makes it to the 'processed' queue:
+        dp=self.session.delivery_properties(routing_key="my-key")
+        session.message_transfer(destination="primary", message=Message(dp, "Good"))
+        #...and one that does not:
+        dp=self.session.delivery_properties(routing_key="unused-key")
+        session.message_transfer(destination="primary", message=Message(dp, "Bad"))
+
+        #delete the exchanges
+        session.exchange_delete(exchange="primary")
+        session.exchange_delete(exchange="secondary")
+
+        #verify behaviour
+        self.assertEqual("Good", processed.get(timeout=1).body)
+        self.assertEqual("Bad", returned.get(timeout=1).body)
+        self.assertEmpty(processed)
+        self.assertEmpty(returned)
+
+    def test_queue_delete(self):
+        """
+        Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+        """
+        session = self.session
+        #set up a 'dead letter queue':
+        session.exchange_declare(exchange="dlq", type="fanout")
+        session.queue_declare(queue="deleted", exclusive=True, auto_delete=True)
+        session.exchange_bind(exchange="dlq", queue="deleted")
+        session.message_subscribe(destination="dlq", queue="deleted")
+        session.message_flow(destination="dlq", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="dlq", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        dlq = session.incoming("dlq")
+
+        #create a queue using the dlq as its alternate exchange:
+        session.queue_declare(queue="delete-me", alternate_exchange="dlq")
+        #send it some messages:
+        dp=self.session.delivery_properties(routing_key="delete-me")
+        session.message_transfer(message=Message(dp, "One"))
+        session.message_transfer(message=Message(dp, "Two"))
+        session.message_transfer(message=Message(dp, "Three"))
+        #delete it:
+        session.queue_delete(queue="delete-me")
+        #delete the dlq exchange:
+        session.exchange_delete(exchange="dlq")
+
+        #check the messages were delivered to the dlq:
+        self.assertEqual("One", dlq.get(timeout=1).body)
+        self.assertEqual("Two", dlq.get(timeout=1).body)
+        self.assertEqual("Three", dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+
+    def test_delete_while_used_by_queue(self):
+        """
+        Ensure an exchange still in use as an alternate-exchange for a
+        queue can't be deleted
+        """
+        session = self.session
+        session.exchange_declare(exchange="alternate", type="fanout")
+
+        session2 = self.conn.session("alternate", 2)
+        session2.queue_declare(queue="q", alternate_exchange="alternate")
+        try:
+            session2.exchange_delete(exchange="alternate")
+            self.fail("Expected deletion of in-use alternate-exchange to fail")
+        except SessionException, e:
+            session = self.session
+            session.queue_delete(queue="q")
+            session.exchange_delete(exchange="alternate")
+            self.assertEquals(530, e.args[0].error_code)            
+
+
+    def test_delete_while_used_by_exchange(self):
+        """
+        Ensure an exchange still in use as an alternate-exchange for 
+        another exchange can't be deleted
+        """
+        session = self.session
+        session.exchange_declare(exchange="alternate", type="fanout")
+
+        session = self.conn.session("alternate", 2)
+        session.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate")
+        try:
+            session.exchange_delete(exchange="alternate")
+            self.fail("Expected deletion of in-use alternate-exchange to fail")
+        except SessionException, e:
+            session = self.session
+            session.exchange_delete(exchange="e")
+            session.exchange_delete(exchange="alternate")
+            self.assertEquals(530, e.args[0].error_code)
+
+
+    def test_modify_existing_exchange_alternate(self):
+        """
+        Ensure that attempting to modify an exhange to change
+        the alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="alt2", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        try:
+            # attempt to change the alternate on an already existing exchange
+            session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2")
+            self.fail("Expected changing an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt2")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_add_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by adding
+        an alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="noalternate", type="fanout")
+        try:
+            # attempt to add an alternate on an already existing exchange
+            session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1")
+            self.fail("Expected adding an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="noalternate")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_del_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by declaring
+        it again without an alternate does nothing
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        # attempt to re-declare without an alternate - silently ignore
+        session.exchange_declare(exchange="onealternate", type="fanout" )
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt1")
+
+
+    def assertEmpty(self, queue):
+        try:
+            msg = queue.get(timeout=1) 
+            self.fail("Queue not empty: " + msg)
+        except Empty: None

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+
+class BrokerTests(TestBase010):
+    """Tests for basic Broker functionality"""
+
+    def test_ack_and_no_ack(self):
+        """
+        First, this test tries to receive a message with a no-ack
+        consumer. Second, this test tries to explicitly receive and
+        acknowledge a message with an acknowledging consumer.
+        """
+        session = self.session
+        session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True)
+
+        # No ack consumer
+        ctag = "tag1"
+        session.message_subscribe(queue = "myqueue", destination = ctag)
+        session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        body = "test no-ack"
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body))
+        msg = session.incoming(ctag).get(timeout = 5)
+        self.assert_(msg.body == body)
+
+        # Acknowledging consumer
+        session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True)
+        ctag = "tag2"
+        session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1)
+        session.message_flow(destination=ctag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination=ctag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        body = "test ack"
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body))
+        msg = session.incoming(ctag).get(timeout = 5)
+        session.message_accept(RangedSet(msg.id))
+        self.assert_(msg.body == body)
+        
+    def test_simple_delivery_immediate(self):
+        """
+        Test simple message delivery where consume is issued before publish
+        """
+        session = self.session
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="test-queue", exchange="amq.fanout")
+        consumer_tag = "tag1"
+        session.message_subscribe(queue="test-queue", destination=consumer_tag)
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+        queue = session.incoming(consumer_tag)
+
+        body = "Immediate Delivery"
+        session.message_transfer("amq.fanout", None, None, Message(body))
+        msg = queue.get(timeout=5)
+        self.assert_(msg.body == body)
+
+    def test_simple_delivery_queued(self):
+        """
+        Test basic message delivery where publish is issued before consume
+        (i.e. requires queueing of the message)
+        """
+        session = self.session
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="test-queue", exchange="amq.fanout")
+        body = "Queued Delivery"
+        session.message_transfer("amq.fanout", None, None, Message(body))
+
+        consumer_tag = "tag1"
+        session.message_subscribe(queue="test-queue", destination=consumer_tag)
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+        queue = session.incoming(consumer_tag)
+        msg = queue.get(timeout=5)
+        self.assert_(msg.body == body)

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/broker.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,775 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
+from qpid.compat import set
+from struct import pack, unpack
+from time import sleep
+
+class DtxTests(TestBase010):
+    """
+    Tests for the amqp dtx related classes.
+
+    Tests of the form test_simple_xxx test the basic transactional
+    behaviour. The approach here is to 'swap' a message from one queue
+    to another by consuming and re-publishing in the same
+    transaction. That transaction is then completed in different ways
+    and the appropriate result verified.
+
+    The other tests enforce more specific rules and behaviour on a
+    per-method or per-field basis.        
+    """
+
+    XA_RBROLLBACK = 1
+    XA_RBTIMEOUT = 2
+    XA_OK = 0
+    tx_counter = 0
+
+    def reset_channel(self):
+        self.session.close()
+        self.session = self.conn.session("dtx-session", 1)
+
+    def test_simple_commit(self):
+        """        
+        Test basic one-phase commit behaviour.     
+        """
+        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+        session = self.session
+        tx = self.xid("my-xid")
+        self.txswap(tx, "commit")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)
+
+        #should close and reopen session to ensure no unacked messages are held
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("commit", "queue-b")
+
+    def test_simple_prepare_commit(self):
+        """        
+        Test basic two-phase commit behaviour.     
+        """
+        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+        session = self.session
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-commit")
+
+        #prepare
+        self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("prepare-commit", "queue-b")
+
+
+    def test_simple_rollback(self):
+        """        
+        Test basic rollback behaviour.     
+        """
+        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+        session = self.session
+        tx = self.xid("my-xid")
+        self.txswap(tx, "rollback")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("rollback", "queue-a")
+
+    def test_simple_prepare_rollback(self):
+        """        
+        Test basic rollback behaviour after the transaction has been prepared.     
+        """
+        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+        session = self.session
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-rollback")
+
+        #prepare
+        self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("prepare-rollback", "queue-a")    
+
+    def test_select_required(self):
+        """
+        check that an error is flagged if select is not issued before
+        start or end        
+        """
+        session = self.session
+        tx = self.xid("dummy")
+        try:
+            session.dtx_start(xid=tx)
+            
+            #if we get here we have failed, but need to do some cleanup:
+            session.dtx_end(xid=tx)
+            session.dtx_rollback(xid=tx)
+            self.fail("Session not selected for use with dtx, expected exception!")
+        except SessionException, e:
+            self.assertEquals(503, e.args[0].error_code)
+
+    def test_start_already_known(self):
+        """
+        Verify that an attempt to start an association with a
+        transaction that is already known is not allowed (unless the
+        join flag is set).
+        """
+        #create two sessions on different connection & select them for use with dtx:
+        session1 = self.session
+        session1.dtx_select()
+
+        other = self.connect()
+        session2 = other.session("other", 0)
+        session2.dtx_select()
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one session under that xid:
+        session1.dtx_start(xid=tx)
+        #then start on the other without the join set
+        failed = False
+        try:
+            session2.dtx_start(xid=tx)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        #cleanup:
+        if not failed:
+            session2.dtx_end(xid=tx)
+            other.close()
+        session1.dtx_end(xid=tx)
+        session1.dtx_rollback(xid=tx)
+        
+        #verification:
+        if failed: self.assertEquals(530, error.args[0].error_code)
+        else: self.fail("Xid already known, expected exception!")                    
+
+    def test_forget_xid_on_completion(self):
+        """
+        Verify that a xid is 'forgotten' - and can therefore be used
+        again - once it is completed.
+        """
+        #do some transactional work & complete the transaction
+        self.test_simple_commit()
+        # session has been reset, so reselect for use with dtx
+        self.session.dtx_select()        
+        
+        #start association for the same xid as the previously completed txn
+        tx = self.xid("my-xid")
+        self.session.dtx_start(xid=tx)
+        self.session.dtx_end(xid=tx)
+        self.session.dtx_rollback(xid=tx)
+
+    def test_start_join_and_resume(self):
+        """
+        Ensure the correct error is signalled when both the join and
+        resume flags are set on starting an association between a
+        session and a transcation.
+        """
+        session = self.session
+        session.dtx_select()
+        tx = self.xid("dummy")
+        try:
+            session.dtx_start(xid=tx, join=True, resume=True)
+            #failed, but need some cleanup:
+            session.dtx_end(xid=tx)
+            session.dtx_rollback(xid=tx)
+            self.fail("Join and resume both set, expected exception!")
+        except SessionException, e:
+            self.assertEquals(503, e.args[0].error_code)
+
+    def test_start_join(self):
+        """        
+        Verify 'join' behaviour, where a session is associated with a
+        transaction that is already associated with another session.        
+        """
+        guard = self.keepQueuesAlive(["one", "two"])
+        #create two sessions & select them for use with dtx:
+        session1 = self.session
+        session1.dtx_select()
+
+        session2 = self.conn.session("second", 2)
+        session2.dtx_select()
+
+        #setup
+        session1.queue_declare(queue="one", auto_delete=True)
+        session1.queue_declare(queue="two", auto_delete=True)
+        session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
+        session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one session under that xid:
+        session1.dtx_start(xid=tx)
+        #then start on the other with the join flag set
+        session2.dtx_start(xid=tx, join=True)
+
+        #do work through each session
+        self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
+        self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'
+
+        #mark end on both sessions
+        session1.dtx_end(xid=tx)
+        session2.dtx_end(xid=tx)
+        
+        #commit and check
+        session1.dtx_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+        
+
+    def test_suspend_resume(self):
+        """
+        Test suspension and resumption of an association
+        """
+        session = self.session
+        session.dtx_select()
+
+        #setup
+        session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+        session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+        session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
+
+        tx = self.xid("dummy")
+
+        session.dtx_start(xid=tx)
+        self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+        session.dtx_end(xid=tx, suspend=True)
+
+        session.dtx_start(xid=tx, resume=True)
+        self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+        session.dtx_end(xid=tx)
+        
+        #commit and check
+        session.dtx_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_suspend_start_end_resume(self):        
+        """
+        Test suspension and resumption of an association with work
+        done on another transaction when the first transaction is
+        suspended
+        """
+        session = self.session
+        session.dtx_select()
+
+        #setup
+        session.queue_declare(queue="one", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="two", exclusive=True, auto_delete=True)
+        session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
+        session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))
+
+        tx = self.xid("dummy")
+
+        session.dtx_start(xid=tx)
+        self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
+        session.dtx_end(xid=tx, suspend=True)
+
+        session.dtx_start(xid=tx, resume=True)
+        self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
+        session.dtx_end(xid=tx)
+        
+        #commit and check
+        session.dtx_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_end_suspend_and_fail(self):
+        """        
+        Verify that the correct error is signalled if the suspend and
+        fail flag are both set when disassociating a transaction from
+        the session        
+        """
+        session = self.session
+        session.dtx_select()
+        tx = self.xid("suspend_and_fail")
+        session.dtx_start(xid=tx)
+        try:
+            session.dtx_end(xid=tx, suspend=True, fail=True)
+            self.fail("Suspend and fail both set, expected exception!")
+        except SessionException, e:
+            self.assertEquals(503, e.args[0].error_code)
+
+        #cleanup    
+        other = self.connect()
+        session = other.session("cleanup", 1)
+        session.dtx_rollback(xid=tx)
+        session.close()
+        other.close()
+    
+
+    def test_end_unknown_xid(self):
+        """        
+        Verifies that the correct exception is thrown when an attempt
+        is made to end the association for a xid not previously
+        associated with the session
+        """
+        session = self.session
+        session.dtx_select()
+        tx = self.xid("unknown-xid")
+        try:
+            session.dtx_end(xid=tx)
+            self.fail("Attempted to end association with unknown xid, expected exception!")
+        except SessionException, e:
+            self.assertEquals(409, e.args[0].error_code)
+
+    def test_end(self):
+        """
+        Verify that the association is terminated by end and subsequent
+        operations are non-transactional        
+        """
+        guard = self.keepQueuesAlive(["tx-queue"])
+        session = self.conn.session("alternate", 1)
+        session.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+
+        #publish a message under a transaction
+        session.dtx_select()
+        tx = self.xid("dummy")
+        session.dtx_start(xid=tx)
+        session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
+        session.dtx_end(xid=tx)
+
+        #now that association with txn is ended, publish another message
+        session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))
+
+        #check the second message is available, but not the first
+        self.assertMessageCount(1, "tx-queue")
+        self.subscribe(session, queue="tx-queue", destination="results")
+        msg = session.incoming("results").get(timeout=1)
+        self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
+        session.message_cancel(destination="results")
+        #ack the message then close the session
+        session.message_accept(RangedSet(msg.id))
+        session.close()
+
+        session = self.session        
+        #commit the transaction and check that the first message (and
+        #only the first message) is then delivered
+        session.dtx_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "tx-queue")
+        self.assertMessageId("one", "tx-queue")
+
+    def test_invalid_commit_one_phase_true(self):
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.session("tester", 1)
+        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        tester.dtx_select()
+        tx = self.xid("dummy")
+        tester.dtx_start(xid=tx)
+        tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+        tester.dtx_end(xid=tx)
+        tester.dtx_prepare(xid=tx)
+        failed = False
+        try:
+            tester.dtx_commit(xid=tx, one_phase=True)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.session.dtx_rollback(xid=tx)
+            self.assertEquals(409, error.args[0].error_code)
+        else:
+            tester.close()
+            other.close()
+            self.fail("Invalid use of one_phase=True, expected exception!")
+
+    def test_invalid_commit_one_phase_false(self):
+        """
+        Test that a commit with one_phase = False is rejected if the
+        transaction in question has not yet been prepared.        
+        """
+        other = self.connect()
+        tester = other.session("tester", 1)
+        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        tester.dtx_select()
+        tx = self.xid("dummy")
+        tester.dtx_start(xid=tx)
+        tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+        tester.dtx_end(xid=tx)
+        failed = False
+        try:
+            tester.dtx_commit(xid=tx, one_phase=False)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.session.dtx_rollback(xid=tx)
+            self.assertEquals(409, error.args[0].error_code)
+        else:
+            tester.close()
+            other.close()
+            self.fail("Invalid use of one_phase=False, expected exception!")
+
+    def test_invalid_commit_not_ended(self):
+        """
+        Test that a commit fails if the xid is still associated with a session.        
+        """
+        other = self.connect()
+        tester = other.session("tester", 1)
+        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        self.session.dtx_select()
+        tx = self.xid("dummy")
+        self.session.dtx_start(xid=tx)
+        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+        failed = False
+        try:
+            tester.dtx_commit(xid=tx, one_phase=False)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.session.dtx_end(xid=tx)
+            self.session.dtx_rollback(xid=tx)
+            self.assertEquals(409, error.args[0].error_code)
+        else:
+            tester.close()
+            other.close()
+            self.fail("Commit should fail as xid is still associated!")
+
+    def test_invalid_rollback_not_ended(self):
+        """
+        Test that a rollback fails if the xid is still associated with a session.        
+        """
+        other = self.connect()
+        tester = other.session("tester", 1)
+        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        self.session.dtx_select()
+        tx = self.xid("dummy")
+        self.session.dtx_start(xid=tx)
+        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+        failed = False
+        try:
+            tester.dtx_rollback(xid=tx)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.session.dtx_end(xid=tx)
+            self.session.dtx_rollback(xid=tx)
+            self.assertEquals(409, error.args[0].error_code)
+        else:
+            tester.close()
+            other.close()
+            self.fail("Rollback should fail as xid is still associated!")
+
+
+    def test_invalid_prepare_not_ended(self):
+        """
+        Test that a prepare fails if the xid is still associated with a session.        
+        """
+        other = self.connect()
+        tester = other.session("tester", 1)
+        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        self.session.dtx_select()
+        tx = self.xid("dummy")
+        self.session.dtx_start(xid=tx)
+        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
+
+        failed = False
+        try:
+            tester.dtx_prepare(xid=tx)
+        except SessionException, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.session.dtx_end(xid=tx)
+            self.session.dtx_rollback(xid=tx)
+            self.assertEquals(409, error.args[0].error_code)
+        else:
+            tester.close()
+            other.close()
+            self.fail("Rollback should fail as xid is still associated!")
+
+    def test_implicit_end(self):
+        """
+        Test that an association is implicitly ended when the session
+        is closed (whether by exception or explicit client request)
+        and the transaction in question is marked as rollback only.
+        """
+        session1 = self.session
+        session2 = self.conn.session("other", 2)
+
+        #setup:
+        session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
+        tx = self.xid("dummy")
+
+        session2.dtx_select()
+        session2.dtx_start(xid=tx)
+        session2.message_subscribe(queue="dummy", destination="dummy")
+        session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1)
+        session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
+        msg = session2.incoming("dummy").get(timeout=1)
+        session2.message_accept(RangedSet(msg.id))
+        session2.message_cancel(destination="dummy")
+        session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
+        session2.close()
+
+        self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
+        session1.dtx_rollback(xid=tx)
+
+    def test_get_timeout(self):
+        """        
+        Check that get-timeout returns the correct value, (and that a
+        transaction with a timeout can complete normally)        
+        """
+        session = self.session
+        tx = self.xid("dummy")
+
+        session.dtx_select()
+        session.dtx_start(xid=tx)
+        self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout)
+        session.dtx_set_timeout(xid=tx, timeout=60)
+        self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
+        self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
+        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)        
+        
+    def test_set_timeout(self):
+        """        
+        Test the timeout of a transaction results in the expected
+        behaviour        
+        """
+
+        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
+        #open new session to allow self.session to be used in checking the queue
+        session = self.conn.session("worker", 1)
+        #setup:
+        tx = self.xid("dummy")
+        session.queue_declare(queue="queue-a", auto_delete=True)
+        session.queue_declare(queue="queue-b", auto_delete=True)
+        session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))
+
+        session.dtx_select()
+        session.dtx_start(xid=tx)
+        self.swap(session, "queue-a", "queue-b")
+        session.dtx_set_timeout(xid=tx, timeout=2)
+        sleep(3)
+        #check that the work has been rolled back already
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("timeout", "queue-a")
+        #check the correct codes are returned when we try to complete the txn
+        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
+        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)        
+
+
+
+    def test_recover(self):
+        """
+        Test basic recover behaviour
+        """
+        session = self.session
+
+        session.dtx_select()
+        session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+
+        prepared = []
+        for i in range(1, 10):
+            tx = self.xid("tx%s" % (i))
+            session.dtx_start(xid=tx)
+            session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
+            session.dtx_end(xid=tx)
+            if i in [2, 5, 6, 8]:
+                session.dtx_prepare(xid=tx)
+                prepared.append(tx)
+            else:    
+                session.dtx_rollback(xid=tx)
+
+        xids = session.dtx_recover().in_doubt
+        
+        #rollback the prepared transactions returned by recover
+        for x in xids:
+            session.dtx_rollback(xid=x)            
+
+        #validate against the expected list of prepared transactions
+        actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
+        expected = set([x.global_id for x in prepared])
+        intersection = actual.intersection(expected)
+        
+        if intersection != expected:
+            missing = expected.difference(actual)
+            extra = actual.difference(expected)
+            self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+    def test_bad_resume(self):
+        """
+        Test that a resume on a session not selected for use with dtx fails
+        """
+        session = self.session
+        try:
+            session.dtx_start(resume=True)
+        except SessionException, e:
+            self.assertEquals(503, e.args[0].error_code)
+
+    def test_prepare_unknown(self):
+        session = self.session
+        try:
+            session.dtx_prepare(xid=self.xid("unknown"))
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_commit_unknown(self):
+        session = self.session
+        try:
+            session.dtx_commit(xid=self.xid("unknown"))
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_rollback_unknown(self):
+        session = self.session
+        try:
+            session.dtx_rollback(xid=self.xid("unknown"))
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_get_timeout_unknown(self):
+        session = self.session
+        try:
+            session.dtx_get_timeout(xid=self.xid("unknown"))
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def xid(self, txid):
+        DtxTests.tx_counter += 1
+        branchqual = "v%s" % DtxTests.tx_counter
+        return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
+
+    def txswap(self, tx, id):
+        session = self.session
+        #declare two queues:
+        session.queue_declare(queue="queue-a", auto_delete=True)
+        session.queue_declare(queue="queue-b", auto_delete=True)
+
+        #put message with specified id on one queue:
+        dp=session.delivery_properties(routing_key="queue-a")
+        mp=session.message_properties(correlation_id=id)
+        session.message_transfer(message=Message(dp, mp, "DtxMessage"))
+
+        #start the transaction:
+        session.dtx_select()        
+        self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+
+        #'swap' the message from one queue to the other, under that transaction:
+        self.swap(self.session, "queue-a", "queue-b")
+
+        #mark the end of the transactional work:
+        self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
+
+    def swap(self, session, src, dest):
+        #consume from src:
+        session.message_subscribe(destination="temp-swap", queue=src)
+        session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1)
+        session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        msg = session.incoming("temp-swap").get(timeout=1)
+        session.message_cancel(destination="temp-swap")
+        session.message_accept(RangedSet(msg.id))
+        #todo: also complete at this point?
+
+        #re-publish to dest:
+        dp=session.delivery_properties(routing_key=dest)
+        mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
+        session.message_transfer(message=Message(dp, mp, msg.body))
+
+    def assertMessageCount(self, expected, queue):
+        self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)
+
+    def assertMessageId(self, expected, queue):
+        self.session.message_subscribe(queue=queue, destination="results")
+        self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1)
+        self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+        self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
+        self.session.message_cancel(destination="results")
+
+    def getMessageProperty(self, msg, prop):
+        for h in msg.headers:
+            if hasattr(h, prop): return getattr(h, prop)
+        return None            
+
+    def keepQueuesAlive(self, names):
+        session = self.conn.session("nasty", 99)
+        for n in names:
+            session.queue_declare(queue=n, auto_delete=True)
+            session.message_subscribe(destination=n, queue=n)
+        return session
+        
+    def createMessage(self, session, key, id, body):
+        dp=session.delivery_properties(routing_key=key)
+        mp=session.message_properties(correlation_id=id)
+        session.message_transfer(message=Message(dp, mp, body))

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class ExampleTest (TestBase010):
+    """
+    An example Qpid test, illustrating the unittest framework and the
+    python Qpid client. The test class must inherit TestBase.  The
+    test code uses the Qpid client to interact with a qpid broker and
+    verify it behaves as expected.
+    """ 
+
+    def test_example(self):
+        """
+        An example test. Note that test functions must start with 'test_'
+        to be recognized by the test framework.
+        """
+
+        # By inheriting TestBase, self.client is automatically connected
+        # and self.session is automatically opened as session(1)
+        # Other session methods mimic the protocol.
+        session = self.session
+
+        # Now we can send regular commands. If you want to see what the method
+        # arguments mean or what other commands are available, you can use the
+        # python builtin help() method. For example:
+        #help(chan)
+        #help(chan.exchange_declare)
+
+        # If you want browse the available protocol methods without being
+        # connected to a live server you can use the amqp-doc utility:
+        #
+        #   Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+        #
+        #   Options:
+        #       -e, --regexp    use regex instead of glob when matching
+
+        # Now that we know what commands are available we can use them to
+        # interact with the server.
+
+        # Here we use ordinal arguments.
+        session.exchange_declare("test", "direct")
+
+        # Here we use keyword arguments.
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="test-queue", exchange="test", binding_key="key")
+
+        # Call Session.subscribe to register as a consumer.
+        # All the protocol methods return a message object. The message object
+        # has fields corresponding to the reply method fields, plus a content
+        # field that is filled if the reply includes content. In this case the
+        # interesting field is the consumer_tag.
+        session.message_subscribe(queue="test-queue", destination="consumer_tag")
+        session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+        # We can use the session.incoming(...) method to access the messages
+        # delivered for our consumer_tag.
+        queue = session.incoming("consumer_tag")
+
+        # Now lets publish a message and see if our consumer gets it. To do
+        # this we need to import the Message class.
+        delivery_properties = session.delivery_properties(routing_key="key")
+        sent = Message(delivery_properties, "Hello World!")
+        session.message_transfer(destination="test", message=sent)
+
+        # Now we'll wait for the message to arrive. We can use the timeout
+        # argument in case the server hangs. By default queue.get() will wait
+        # until a message arrives or the connection to the server dies.
+        msg = queue.get(timeout=10)
+
+        # And check that we got the right response with assertEqual
+        self.assertEqual(sent.body, msg.body)
+
+        # Now acknowledge the message.
+        session.message_accept(RangedSet(msg.id))
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/example.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,461 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging, traceback
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.client import Closed
+from qpid.session import SessionException
+
+
+class TestHelper(TestBase010):
+    def setUp(self):
+        TestBase010.setUp(self)
+        self.queues = []
+        self.exchanges = []
+        self.subscriptions = []
+
+    def tearDown(self):
+        try:
+            for s in self.subscriptions:
+                self.session.message_cancel(destination=s)
+            for ssn, q in self.queues:
+                ssn.queue_delete(queue=q)
+            for ssn, ex in self.exchanges:
+                ssn.exchange_delete(exchange=ex)
+        except:
+            print "Error on tearDown:"
+            print traceback.print_exc()
+        TestBase010.tearDown(self)
+
+    def createMessage(self, key="", body=""):
+        return Message(self.session.delivery_properties(routing_key=key), body)
+
+    def getApplicationHeaders(self, msg):
+        for h in msg.headers:
+            if hasattr(h, 'application_headers'): return getattr(h, 'application_headers')
+        return None            
+
+    def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
+        """
+        Publish to exchange and assert queue.get() returns the same message.
+        """
+        body = self.uniqueString()
+        dp=self.session.delivery_properties(routing_key=routing_key)
+        mp=self.session.message_properties(application_headers=properties)
+        self.session.message_transfer(destination=exchange, message=Message(dp, mp, body))
+        msg = queue.get(timeout=1)
+        self.assertEqual(body, msg.body)
+        if (properties):
+            self.assertEqual(properties, self.getApplicationHeaders(msg))
+
+    def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
+        """
+        Publish a message and consume it, assert it comes back intact.
+        Return the Queue object used to consume.
+        """
+        self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
+
+    def assertEmpty(self, queue):
+        """Assert that the queue is empty"""
+        try:
+            queue.get(timeout=1)
+            self.fail("Queue is not empty.")
+        except Queue.Empty: None              # Ignore
+        
+    def queue_declare(self, session=None, *args, **keys):
+        session = session or self.session
+        reply = session.queue_declare(*args, **keys)
+        self.queues.append((session, keys["queue"]))
+        return reply
+
+    def exchange_declare(self, session=None, ticket=0, exchange='',
+                         type='', passive=False, durable=False,
+                         auto_delete=False,
+                         arguments={}):
+        session = session or self.session
+        reply = session.exchange_declare(exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
+        self.exchanges.append((session,exchange))
+        return reply
+
+    def uniqueString(self):
+        """Generate a unique string, unique for this TestBase instance"""
+        if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+        return "Test Message " + str(self.uniqueCounter)
+
+    def consume(self, queueName):
+        """Consume from named queue returns the Queue object."""
+        if not "uniqueTag" in dir(self): self.uniqueTag = 1
+        else: self.uniqueTag += 1
+        consumer_tag = "tag" + str(self.uniqueTag)
+        self.session.message_subscribe(queue=queueName, destination=consumer_tag)
+        self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
+        self.session.message_flow(destination=consumer_tag, unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+        self.subscriptions.append(consumer_tag)
+        return self.session.incoming(consumer_tag)
+
+
+class StandardExchangeVerifier:
+    """Verifies standard exchange behavior.
+
+    Used as base class for classes that test standard exchanges."""
+
+    def verifyDirectExchange(self, ex):
+        """Verify that ex behaves like a direct exchange."""
+        self.queue_declare(queue="q")
+        self.session.exchange_bind(queue="q", exchange=ex, binding_key="k")
+        self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+        try:
+            self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+            self.fail("Expected Empty exception")
+        except Queue.Empty: None # Expected
+
+    def verifyFanOutExchange(self, ex):
+        """Verify that ex behaves like a fanout exchange."""
+        self.queue_declare(queue="q") 
+        self.session.exchange_bind(queue="q", exchange=ex)
+        self.queue_declare(queue="p") 
+        self.session.exchange_bind(queue="p", exchange=ex)
+        for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+    def verifyTopicExchange(self, ex):
+        """Verify that ex behaves like a topic exchange"""
+        self.queue_declare(queue="a")
+        self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*")
+        q = self.consume("a")
+        self.assertPublishGet(q, ex, "a.b.x")
+        self.assertPublishGet(q, ex, "a.x.b.x")
+        self.assertPublishGet(q, ex, "a.x.x.b.x")
+        # Shouldn't match
+        self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))        
+        self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))        
+        self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))        
+        self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+        self.assert_(q.empty())
+
+    def verifyHeadersExchange(self, ex):
+        """Verify that ex is a headers exchange"""
+        self.queue_declare(queue="q")
+        self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+        q = self.consume("q")
+        headers = {"name":"fred", "age":3}
+        self.assertPublishGet(q, exchange=ex, properties=headers)
+        self.session.message_transfer(destination=ex) # No headers, won't deliver
+        self.assertEmpty(q);                 
+        
+
+class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier):
+    """
+    The server SHOULD implement these standard exchange types: topic, headers.
+    
+    Client attempts to declare an exchange with each of these standard types.
+    """
+
+    def testDirect(self):
+        """Declare and test a direct exchange"""
+        self.exchange_declare(0, exchange="d", type="direct")
+        self.verifyDirectExchange("d")
+
+    def testFanout(self):
+        """Declare and test a fanout exchange"""
+        self.exchange_declare(0, exchange="f", type="fanout")
+        self.verifyFanOutExchange("f")
+
+    def testTopic(self):
+        """Declare and test a topic exchange"""
+        self.exchange_declare(0, exchange="t", type="topic")
+        self.verifyTopicExchange("t")
+
+    def testHeaders(self):
+        """Declare and test a headers exchange"""
+        self.exchange_declare(0, exchange="h", type="headers")
+        self.verifyHeadersExchange("h")
+        
+
+class RequiredInstancesRuleTests(TestHelper, StandardExchangeVerifier):
+    """
+    The server MUST, in each virtual host, pre-declare an exchange instance
+    for each standard exchange type that it implements, where the name of the
+    exchange instance is amq. followed by the exchange type name.
+    
+    Client creates a temporary queue and attempts to bind to each required
+    exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+    those types are defined).
+    """
+    def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+    def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+    def testAmqTopic(self):  self.verifyTopicExchange("amq.topic")
+        
+    def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestHelper, StandardExchangeVerifier):
+    """
+    The server MUST predeclare a direct exchange to act as the default exchange
+    for content Publish methods and for default queue bindings.
+    
+    Client checks that the default exchange is active by specifying a queue
+    binding with no exchange name, and publishing a message with a suitable
+    routing key but without specifying the exchange name, then ensuring that
+    the message arrives in the queue correctly.
+    """
+    def testDefaultExchange(self):
+        # Test automatic binding by queue name.
+        self.queue_declare(queue="d")
+        self.assertPublishConsume(queue="d", routing_key="d")
+        # Test explicit bind to default queue
+        self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestHelper):
+    """
+    The server MUST NOT allow clients to access the default exchange except
+    by specifying an empty exchange name in the Queue.Bind and content Publish
+    methods.
+    """
+
+class ExtensionsRuleTests(TestHelper):
+    """
+    The server MAY implement other exchange types as wanted.
+    """
+
+
+class DeclareMethodMinimumRuleTests(TestHelper):
+    """
+    The server SHOULD support a minimum of 16 exchanges per virtual host and
+    ideally, impose no limit except as defined by available resources.
+    
+    The client creates as many exchanges as it can until the server reports
+    an error; the number of exchanges successfuly created must be at least
+    sixteen.
+    """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestHelper):
+    """
+    The client MUST provide a valid access ticket giving "active" access to
+    the realm in which the exchange exists or will be created, or "passive"
+    access if the if-exists flag is set.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestHelper):
+    """
+    Exchange names starting with "amq." are reserved for predeclared and
+    standardised exchanges. The client MUST NOT attempt to create an exchange
+    starting with "amq.".
+    
+    Similarly, exchanges starting with "qpid." are reserved for Qpid
+    implementation-specific system exchanges (such as the management exchange).
+    The client must not attempt to create an exchange starting with the string
+    "qpid.".
+    """
+    def template(self, reservedString, exchangeType):
+        try:
+            self.session.exchange_declare(exchange=reservedString, type=exchangeType)
+            self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".")
+        except SessionException, e:
+            self.assertEquals(e.args[0].error_code, 530)
+        # connection closed, reopen it
+        self.tearDown()
+        self.setUp()
+        try:
+            self.session.exchange_declare(exchange=reservedString + "abc123", type=exchangeType)
+            self.fail("Expected not allowed error (530) for exchanges starting with \"" + reservedString + "\".")
+        except SessionException, e:
+            self.assertEquals(e.args[0].error_code, 530)
+        # connection closed, reopen it
+        self.tearDown()
+        self.setUp()
+        # The following should be legal:
+        self.session.exchange_declare(exchange=reservedString[:-1], type=exchangeType)
+        self.session.exchange_delete(exchange=reservedString[:-1])
+        self.session.exchange_declare(exchange=reservedString[1:], type=exchangeType)
+        self.session.exchange_delete(exchange=reservedString[1:])
+        self.session.exchange_declare(exchange="." + reservedString, type=exchangeType)
+        self.session.exchange_delete(exchange="." + reservedString)
+        self.session.exchange_declare(exchange="abc." + reservedString, type=exchangeType)
+        self.session.exchange_delete(exchange="abc." + reservedString)
+        self.session.exchange_declare(exchange="abc." + reservedString + "def", type=exchangeType)
+        self.session.exchange_delete(exchange="abc." + reservedString + "def")
+
+    def test_amq(self):
+        self.template("amq.", "direct")
+        self.template("amq.", "topic")
+        self.template("amq.", "fanout")
+
+    def test_qpid(self):
+        self.template("qpid.", "direct")
+        self.template("qpid.", "topic")
+        self.template("qpid.", "fanout")
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestHelper):
+    """
+    Exchanges cannot be redeclared with different types.  The client MUST not
+    attempt to redeclare an existing exchange with a different type than used
+    in the original Exchange.Declare method.
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestHelper):
+    """
+    The client MUST NOT attempt to create an exchange with a type that the
+    server does not support.
+    
+    
+    """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestHelper):
+    """
+    If set, and the exchange does not already exist, the server MUST raise a
+    channel exception with reply code 404 (not found).    
+    """
+    def test(self):
+        try:
+            self.session.exchange_declare(exchange="humpty_dumpty", passive=True)
+            self.fail("Expected 404 for passive declaration of unknown exchange.")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestHelper):
+    """
+    The server MUST support both durable and transient exchanges.
+    
+    
+    """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestHelper):
+    """
+    The server MUST ignore the durable field if the exchange already exists.
+    
+    
+    """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestHelper):
+    """
+    The server MUST ignore the auto-delete field if the exchange already
+    exists.
+    
+    
+    """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestHelper):
+    """
+    The client MUST provide a valid access ticket giving "active" access
+    rights to the exchange's access realm.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestHelper):
+    """
+    The client MUST NOT attempt to delete an exchange that does not exist.
+    """
+
+
+class HeadersExchangeTests(TestHelper):
+    """
+    Tests for headers exchange functionality.
+    """
+    def setUp(self):
+        TestHelper.setUp(self)
+        self.queue_declare(queue="q")
+        self.q = self.consume("q")
+
+    def myAssertPublishGet(self, headers):
+        self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
+
+    def myBasicPublish(self, headers):
+        mp=self.session.message_properties(application_headers=headers)
+        self.session.message_transfer(destination="amq.match", message=Message(mp, "foobar"))
+        
+    def testMatchAll(self):
+        self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+        
+        # None of these should match
+        self.myBasicPublish({})
+        self.myBasicPublish({"name":"barney"})
+        self.myBasicPublish({"name":10})
+        self.myBasicPublish({"name":"fred", "age":2})
+        self.assertEmpty(self.q)
+
+    def testMatchAny(self):
+        self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred"})
+        self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+        self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+        # Wont match
+        self.myBasicPublish({})
+        self.myBasicPublish({"irrelevant":0})
+        self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestHelper):
+    """
+    Test some miscellaneous error conditions
+    """
+    def testTypeNotKnown(self):
+        try:
+            self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+            self.fail("Expected 503 for declaration of unknown exchange type.")
+        except SessionException, e:
+            self.assertEquals(503, e.args[0].error_code)
+
+    def testDifferentDeclaredType(self):
+        self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+        try:
+            session = self.conn.session("alternate", 2)
+            session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+            self.fail("Expected 530 for redeclaration of exchange with different type.")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+    
+class ExchangeTests(TestHelper):
+    def testHeadersBindNoMatchArg(self):
+        self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
+        try: 
+            self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
+            self.fail("Expected failure for missing x-match arg.")
+        except SessionException, e:    
+            self.assertEquals(541, e.args[0].error_code)

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org