You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/24 16:08:33 UTC

svn commit: r559055 [1/2] - in /incubator/qpid/trunk/qpid/python: ./ qpid/ tests/ tests_0-10/ tests_0-9/

Author: gsim
Date: Tue Jul 24 07:08:32 2007
New Revision: 559055

URL: http://svn.apache.org/viewvc?view=rev&rev=559055
Log:
Some initial 0-10 support including placeholders for new domains, use of execution layer for synchronising methods with no explicit responses and a new set of tests (mainly just copies of the 0-9 ones, but these will be altered as 0-10 support progresses).


Added:
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/
    incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/basic.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/broker.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/example.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/execution.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/query.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/testlib.py   (with props)
    incubator/qpid/trunk/qpid/python/tests_0-10/tx.py   (with props)
Modified:
    incubator/qpid/trunk/qpid/python/qpid/codec.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/spec.py
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests/codec.py
    incubator/qpid/trunk/qpid/python/tests_0-9/broker.py

Added: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (added)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Tue Jul 24 07:08:32 2007
@@ -0,0 +1,4 @@
+tests_0-10.message.MessageTests.test_checkpoint
+tests_0-10.message.MessageTests.test_reject
+tests_0-10.basic.BasicTests.test_get
+

Propchange: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?view=diff&rev=559055&r1=559054&r2=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Tue Jul 24 07:08:32 2007
@@ -258,17 +258,9 @@
     codec = Codec(enc)
     if tbl:
       for key, value in tbl.items():
-        # Field names MUST start with a letter, '$' or '#' and may
-        # continue with letters, '$' or '#', digits, or underlines, to
-        # a maximum length of 128 characters.
-
         if len(key) > 128:
           raise ValueError("field table key too long: '%s'" % key)
 
-        m = Codec.KEY_CHECK.match(key)
-        if m == None or m.end() != len(key):
-          raise ValueError("invalid field table key: '%s'" % key)
-
         codec.encode_shortstr(key)
         if isinstance(value, basestring):
           codec.write("S")
@@ -338,3 +330,25 @@
       return self.decode_longstr()
     else:
       return ReferenceId(self.decode_longstr())
+
+  # new domains for 0-10:
+  
+  def encode_uuid(self, s):
+    self.encode_longstr(s)
+
+  def decode_uuid(self):
+    return self.decode_longstr()
+
+  def encode_rfc1982_long(self, s):
+    self.encode_long(s)
+
+  def decode_rfc1982_long(self):
+    return self.decode_long()
+
+  #Not done yet
+  def encode_rfc1982_long_set(self, s):
+    self.encode_short(0)
+
+  def decode_rfc1982_long_set(self):
+    self.decode_short()
+    return 0;

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=559055&r1=559054&r2=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Tue Jul 24 07:08:32 2007
@@ -189,7 +189,9 @@
     self.completion = ExecutionCompletion()
 
     # Use reliable framing if version == 0-9.
-    self.reliable = (spec.major == 0 and spec.minor == 9)
+    #    (also for 0-10 while transitioning...)
+    self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10))
+    self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
     self.synchronous = True
 
   def close(self, reason):
@@ -199,6 +201,7 @@
     self.reason = reason
     self.incoming.close()
     self.responses.close()
+    self.completion.close()
 
   def write(self, frame, content = None):
     if self.closed:
@@ -261,6 +264,11 @@
       
       self.request(frame, self.queue_response, content)
       if not frame.method.responses:
+        if self.use_execution_layer and type.klass.name != "execution":
+          self.execution_flush()
+          self.completion.wait()                  
+          if self.closed:
+            raise Closed(self.reason)
         return None
       try:
         resp = self.responses.get()
@@ -352,6 +360,9 @@
     #the following test is a hack until the track/sub-channel is available
     if method.klass.name != "execution":
       self.command_id = self.sequence.next()
+
+  def close(self):
+    self.completed.set()    
 
   def complete(self, mark):
     self.mark = mark

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=559055&r1=559054&r2=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Tue Jul 24 07:08:32 2007
@@ -237,7 +237,11 @@
               "long": 0,
               "longlong": 0,
               "timestamp": 0,
-              "content": None}
+              "content": None,
+              "uuid": "",
+              "rfc1982_long": 0,
+              "rfc1982_long_set": 0
+              }
 
   def define_method(self, name):
     g = {Method.METHOD: self}
@@ -306,7 +310,9 @@
   for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata):
     # constants
     for nd in root.query["constant"]:
-      const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]),
+      val = nd["@value"]
+      if val.startswith("0x"): continue
+      const = Constant(spec, pythonize(nd["@name"]), int(val),
                        nd["@class"], get_docs(nd))
       try:
         spec.constants.add(const)

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=559055&r1=559054&r2=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Tue Jul 24 07:08:32 2007
@@ -125,6 +125,8 @@
             self.tests=findmodules("tests")
             if self.use08spec():
                 self.tests+=findmodules("tests_0-8")
+            elif self.spec.major == 0 and self.spec.minor == 10:    
+                self.tests+=findmodules("tests_0-10")
             else:
                 self.tests+=findmodules("tests_0-9")
 
@@ -214,10 +216,10 @@
             
     def exchange_declare(self, channel=None, ticket=0, exchange='',
                          type='', passive=False, durable=False,
-                         auto_delete=False, internal=False, nowait=False,
+                         auto_delete=False,
                          arguments={}):
         channel = channel or self.channel
-        reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
+        reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments)
         self.exchanges.append((channel,exchange))
         return reply
 

Modified: incubator/qpid/trunk/qpid/python/tests/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/codec.py?view=diff&rev=559055&r1=559054&r2=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/codec.py Tue Jul 24 07:08:32 2007
@@ -432,13 +432,6 @@
         """
         self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
 
-    # -------------------------------------------
-    def test_field_table_invalid_field_name(self):
-        """
-        invalid field name
-        """
-        self.failUnlessRaises(Exception, self.codec.encode_table, {'1key1':'value1'})
-
     # ----------------------------------------------------
     def test_field_table_invalid_field_name_length(self):
         """

Added: incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,20 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class AlternateExchangeTests(TestBase):
+    """
+    Tests for the new mechanism for message returns introduced in 0-10
+    and available in 0-9 for preview
+    """
+
+    def test_unroutable(self):
+        """
+        Test that unroutable messages are delivered to the alternate-exchange if specified
+        """
+        channel = self.channel
+        #create an exchange with an alternate defined
+        channel.exchange_declare(exchange="secondary", type="fanout")
+        channel.exchange_declare(exchange="primary", type="direct", alternate_exchange="secondary")
+
+        #declare, bind (to the alternate exchange) and consume from a queue for 'returned' messages
+        channel.queue_declare(queue="returns", exclusive=True)
+        channel.queue_bind(queue="returns", exchange="secondary")
+        channel.message_consume(destination="a", queue="returns")
+        returned = self.client.queue("a")
+
+        #declare, bind (to the primary exchange) and consume from a queue for 'processed' messages
+        channel.queue_declare(queue="processed", exclusive=True)
+        channel.queue_bind(queue="processed", exchange="primary", routing_key="my-key")
+        channel.message_consume(destination="b", queue="processed")
+        processed = self.client.queue("b")
+
+        #publish to the primary exchange
+        #...one message that makes it to the 'processed' queue:
+        channel.message_transfer(destination="primary", routing_key="my-key", body="Good")
+        #...and one that does not:
+        channel.message_transfer(destination="primary", routing_key="unused-key", body="Bad")
+
+        #delete the exchanges
+        channel.exchange_delete(exchange="primary")
+        channel.exchange_delete(exchange="secondary")
+
+        #verify behaviour
+        self.assertEqual("Good", processed.get(timeout=1).body)
+        self.assertEqual("Bad", returned.get(timeout=1).body)
+        self.assertEmpty(processed)
+        self.assertEmpty(returned)
+
+    def test_queue_delete(self):
+        """
+        Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+        """
+        channel = self.channel
+        #set up a 'dead letter queue':
+        channel.exchange_declare(exchange="dlq", type="fanout")
+        channel.queue_declare(queue="deleted", exclusive=True)
+        channel.queue_bind(exchange="dlq", queue="deleted")
+        channel.message_consume(destination="dlq", queue="deleted")
+        dlq = self.client.queue("dlq")
+
+        #create a queue using the dlq as its alternate exchange:
+        channel.queue_declare(queue="delete-me", alternate_exchange="dlq")
+        #send it some messages:
+        channel.message_transfer(routing_key="delete-me", body="One")
+        channel.message_transfer(routing_key="delete-me", body="Two")
+        channel.message_transfer(routing_key="delete-me", body="Three")
+        #delete it:
+        channel.queue_delete(queue="delete-me")
+        #delete the dlq exchange:
+        channel.exchange_delete(exchange="dlq")
+
+        #check the messages were delivered to the dlq:
+        self.assertEqual("One", dlq.get(timeout=1).body)
+        self.assertEqual("Two", dlq.get(timeout=1).body)
+        self.assertEqual("Three", dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+
+
+    def test_immediate(self):
+        """
+        Test that messages in a queue being deleted are delivered to the alternate-exchange if specified
+        """
+        channel = self.channel
+        #set up a 'dead letter queue':
+        channel.exchange_declare(exchange="dlq", type="fanout")
+        channel.queue_declare(queue="immediate", exclusive=True)
+        channel.queue_bind(exchange="dlq", queue="immediate")
+        channel.message_consume(destination="dlq", queue="immediate")
+        dlq = self.client.queue("dlq")
+
+        #create a queue using the dlq as its alternate exchange:
+        channel.queue_declare(queue="no-consumers", alternate_exchange="dlq", exclusive=True)
+        #send it some messages:
+        channel.message_transfer(routing_key="no-consumers", body="no one wants me", immediate=True)
+
+        #check the messages were delivered to the dlq:
+        self.assertEqual("no one wants me", dlq.get(timeout=1).body)
+        self.assertEmpty(dlq)
+
+        #cleanup:
+        channel.queue_delete(queue="no-consumers")
+        channel.exchange_delete(exchange="dlq")
+
+
+    def test_delete_while_used_by_queue(self):
+        """
+        Ensure an exchange still in use as an alternate-exchange for a
+        queue can't be deleted
+        """
+        channel = self.channel
+        channel.exchange_declare(exchange="alternate", type="fanout")
+        channel.queue_declare(queue="q", exclusive=True, alternate_exchange="alternate")
+        try:
+            channel.exchange_delete(exchange="alternate")
+            self.fail("Expected deletion of in-use alternate-exchange to fail")
+        except Closed, e:
+            #cleanup:
+            other = self.connect()
+            channel = other.channel(1)
+            channel.channel_open()
+            channel.exchange_delete(exchange="alternate")
+            channel.channel_close(200, "ok")
+            other.close()
+            
+            self.assertConnectionException(530, e.args[0])            
+
+
+
+    def test_delete_while_used_by_exchange(self):
+        """
+        Ensure an exchange still in use as an alternate-exchange for 
+        another exchange can't be deleted
+        """
+        channel = self.channel
+        channel.exchange_declare(exchange="alternate", type="fanout")
+        channel.exchange_declare(exchange="e", type="fanout", alternate_exchange="alternate")
+        try:
+            channel.exchange_delete(exchange="alternate")
+            #cleanup:
+            channel.exchange_delete(exchange="e")
+            self.fail("Expected deletion of in-use alternate-exchange to fail")
+        except Closed, e:
+            #cleanup:
+            other = self.connect()
+            channel = other.channel(1)
+            channel.channel_open()
+            channel.exchange_delete(exchange="e")
+            channel.exchange_delete(exchange="alternate")
+            channel.channel_close(200, "ok")
+            other.close()
+
+            self.assertConnectionException(530, e.args[0])
+            
+
+    def assertEmpty(self, queue):
+        try:
+            msg = queue.get(timeout=1) 
+            self.fail("Queue not empty: " + msg)
+        except Empty: None
+

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/alternate-exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/basic.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/basic.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/basic.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BasicTests(TestBase):
+    """Tests for 'methods' on the amqp basic 'class'"""
+
+    def test_consume_no_local(self):
+        """
+        Test that the no_local flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare two queues:
+        channel.queue_declare(queue="test-queue-1a", exclusive=True)
+        channel.queue_declare(queue="test-queue-1b", exclusive=True)
+        #establish two consumers one of which excludes delivery of locally sent messages
+        channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+        channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+        #send a message
+        channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
+        channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+        #check the queues of the two consumers
+        excluded = self.client.queue("local_excluded")
+        included = self.client.queue("local_included")
+        msg = included.get(timeout=1)
+        self.assertEqual("consume_no_local", msg.content.body)
+        try:
+            excluded.get(timeout=1) 
+            self.fail("Received locally published message though no_local=true")
+        except Empty: None
+
+
+    def test_consume_exclusive(self):
+        """
+        Test that the exclusive flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+        #check that an exclusive consumer prevents other consumer being created:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+        try:
+            channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+            self.fail("Expected consume request to fail due to previous exclusive consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+        #open new channel and cleanup last consumer:    
+        channel = self.client.channel(2)
+        channel.channel_open()
+
+        #check that an exclusive consumer cannot be created if a consumer already exists:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+        try:
+            channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+            self.fail("Expected exclusive consume request to fail due to previous consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+    def test_consume_queue_errors(self):
+        """
+        Test error conditions associated with the queue field of the consume method:
+        """
+        channel = self.channel
+        try:
+            #queue specified but doesn't exist:
+            channel.basic_consume(queue="invalid-queue")
+            self.fail("Expected failure when consuming from non-existent queue")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        channel = self.client.channel(2)
+        channel.channel_open()
+        try:
+            #queue not specified and none previously declared for channel:
+            channel.basic_consume(queue="")
+            self.fail("Expected failure when consuming from unspecified queue")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_consume_unique_consumers(self):
+        """
+        Ensure unique consumer tags are enforced
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+        #check that attempts to use duplicate tags are detected and prevented:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+        try:
+            channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+            self.fail("Expected consume request to fail due to non-unique tag")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_cancel(self):
+        """
+        Test compliance of the basic.cancel method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-4", exclusive=True)
+        channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
+
+        myqueue = self.client.queue("my-consumer")
+        msg = myqueue.get(timeout=1)
+        self.assertEqual("One", msg.content.body)
+	
+        #cancel should stop messages being delivered
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
+        try:
+            msg = myqueue.get(timeout=1) 
+            self.fail("Got message after cancellation: " + msg)
+        except Empty: None
+
+        #cancellation of non-existant consumers should be handled without error
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+    def test_ack(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-ack-queue", exclusive=True)
+        
+        reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+        queue = self.client.queue(reply.consumer_tag)
+
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_recover(requeue=False)
+        
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.content.body)
+        except Empty: None
+
+    def test_recover_requeue(self):
+        """
+        Test requeing on recovery
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-requeue", exclusive=True)
+        
+        subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+
+        channel.basic_recover(requeue=True)
+
+        subscription2 = channel.basic_consume(queue="test-requeue")
+        queue2 = self.client.queue(subscription2.consumer_tag)
+        
+        msg3b = queue2.get(timeout=1)
+        msg5b = queue2.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        self.assertEqual(True, msg3b.redelivered)
+        self.assertEqual(True, msg5b.redelivered)
+
+        try:
+            extra = queue2.get(timeout=1)
+            self.fail("Got unexpected message in second queue: " + extra.content.body)
+        except Empty: None
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in original queue: " + extra.content.body)
+        except Empty: None
+        
+        
+    def test_qos_prefetch_count(self):
+        """
+        Test that the prefetch count specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 5:
+        channel.basic_qos(prefetch_count=5)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered:
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+
+        
+    def test_qos_prefetch_size(self):
+        """
+        Test that the prefetch size specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+        channel.basic_qos(prefetch_size=50)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered (i.e. 45 bytes worth):
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #make sure that a single oversized message still gets delivered
+        large = "abcdefghijklmnopqrstuvwxyz"
+        large = large + "-" + large;
+        channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+        msg = queue.get(timeout=1)
+        self.assertEqual(large, msg.content.body)
+
+    def test_get(self):
+        """
+        Test basic_get method
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-get", exclusive=True)
+        
+        #publish some messages (no_ack=True)
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        #use basic_get to read back the messages, and check that we get an empty at the end
+        for i in range(1, 11):
+            reply = channel.basic_get(no_ack=True)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        #repeat for no_ack=False
+        for i in range(11, 21):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        for i in range(11, 21):
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            if(i == 13):
+                channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
+            if(i in [15, 17, 19]):
+                channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        #recover(requeue=True)
+        channel.basic_recover(requeue=True)
+        
+        #get the unacked messages again (14, 16, 18, 20)
+        for i in [14, 16, 18, 20]:
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        channel.basic_recover(requeue=True)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/basic.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class BrokerTests(TestBase):
+    """Tests for basic Broker functionality"""
+
+    def test_ack_and_no_ack(self):
+        """
+        First, this test tries to receive a message with a no-ack
+        consumer. Second, this test tries to explicitly receive and
+        acknowledge a message with an acknowledging consumer.
+        """
+        ch = self.channel
+        self.queue_declare(ch, queue = "myqueue")
+
+        # No ack consumer
+        ctag = "tag1"
+        ch.message_consume(queue = "myqueue", destination = ctag, no_ack = True)
+        body = "test no-ack"
+        ch.message_transfer(routing_key = "myqueue", body = body)
+        msg = self.client.queue(ctag).get(timeout = 5)
+        self.assert_(msg.body == body)
+
+        # Acknowledging consumer
+        self.queue_declare(ch, queue = "otherqueue")
+        ctag = "tag2"
+        ch.message_consume(queue = "otherqueue", destination = ctag, no_ack = False)
+        body = "test ack"
+        ch.message_transfer(routing_key = "otherqueue", body = body)
+        msg = self.client.queue(ctag).get(timeout = 5)
+        msg.ok()
+        self.assert_(msg.body == body)
+        
+    def test_simple_delivery_immediate(self):
+        """
+        Test simple message delivery where consume is issued before publish
+        """
+        channel = self.channel
+        self.exchange_declare(channel, exchange="test-exchange", type="direct")
+        self.queue_declare(channel, queue="test-queue") 
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        consumer_tag = "tag1"
+        channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
+        queue = self.client.queue(consumer_tag)
+
+        body = "Immediate Delivery"
+        channel.message_transfer(destination="test-exchange", routing_key="key", body=body, immediate=True)
+        msg = queue.get(timeout=5)
+        self.assert_(msg.body == body)
+
+        # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+    def test_simple_delivery_queued(self):
+        """
+        Test basic message delivery where publish is issued before consume
+        (i.e. requires queueing of the message)
+        """
+        channel = self.channel
+        self.exchange_declare(channel, exchange="test-exchange", type="direct")
+        self.queue_declare(channel, queue="test-queue")
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        body = "Queued Delivery"
+        channel.message_transfer(destination="test-exchange", routing_key="key", body=body)
+
+        consumer_tag = "tag1"
+        channel.message_consume(queue="test-queue", destination=consumer_tag, no_ack=True)
+        queue = self.client.queue(consumer_tag)
+        msg = queue.get(timeout=5)
+        self.assert_(msg.body == body)
+
+    def test_invalid_channel(self):
+        channel = self.client.channel(200)
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for invalid channel")
+        except Closed, e:
+            self.assertConnectionException(504, e.args[0])
+        
+    def test_closed_channel(self):
+        channel = self.client.channel(200)
+        channel.channel_open()
+        channel.channel_close()
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for closed channel")
+        except Closed, e:
+            if isinstance(e.args[0], str): self.fail(e)
+            self.assertConnectionException(504, e.args[0])
+
+    def test_channel_flow(self):
+        channel = self.channel
+        channel.queue_declare(queue="flow_test_queue", exclusive=True)
+        channel.message_consume(destination="my-tag", queue="flow_test_queue")
+        incoming = self.client.queue("my-tag")
+        
+        channel.channel_flow(active=False)        
+        channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz")
+        try:
+            incoming.get(timeout=1) 
+            self.fail("Received message when flow turned off.")
+        except Empty: None
+        
+        channel.channel_flow(active=True)
+        msg = incoming.get(timeout=1)
+        self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body)

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,587 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from struct import pack, unpack
+from time import sleep
+
+class DtxTests(TestBase):
+    """
+    Tests for the amqp dtx related classes.
+
+    Tests of the form test_simple_xxx test the basic transactional
+    behaviour. The approach here is to 'swap' a message from one queue
+    to another by consuming and re-publishing in the same
+    transaction. That transaction is then completed in different ways
+    and the appropriate result verified.
+
+    The other tests enforce more specific rules and behaviour on a
+    per-method or per-field basis.        
+    """
+
+    XA_RBROLLBACK = 1
+    XA_RBTIMEOUT = 2
+    XA_OK = 8
+
+    def test_simple_commit(self):
+        """        
+        Test basic one-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "commit")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("commit", "queue-b")
+
+    def test_simple_prepare_commit(self):
+        """        
+        Test basic two-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-commit")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("prepare-commit", "queue-b")
+
+
+    def test_simple_rollback(self):
+        """        
+        Test basic rollback behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "rollback")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("rollback", "queue-a")
+
+    def test_simple_prepare_rollback(self):
+        """        
+        Test basic rollback behaviour after the transaction has been prepared.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-rollback")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("prepare-rollback", "queue-a")    
+
+    def test_select_required(self):
+        """
+        check that an error is flagged if select is not issued before
+        start or end        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx)
+            
+            #if we get here we have failed, but need to do some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Channel not selected for use with dtx, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_already_known(self):
+        """
+        Verify that an attempt to start an association with a
+        transaction that is already known is not allowed (unless the
+        join flag is set).
+        """
+        #create two channels on different connection & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        other = self.connect()
+        channel2 = other.channel(1)
+        channel2.channel_open()
+        channel2.dtx_demarcation_select()
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other without the join set
+        failed = False
+        try:
+            channel2.dtx_demarcation_start(xid=tx)
+        except Closed, e:
+            failed = True
+            error = e
+
+        #cleanup:
+        if not failed:
+            channel2.dtx_demarcation_end(xid=tx)
+            other.close()
+        channel1.dtx_demarcation_end(xid=tx)
+        channel1.dtx_coordination_rollback(xid=tx)
+        
+        #verification:
+        if failed: self.assertConnectionException(503, e.args[0])
+        else: self.fail("Xid already known, expected exception!")                    
+
+    def test_forget_xid_on_completion(self):
+        """
+        Verify that a xid is 'forgotten' - and can therefore be used
+        again - once it is completed.
+        """
+        channel = self.channel
+        #do some transactional work & complete the transaction
+        self.test_simple_commit()
+        
+        #start association for the same xid as the previously completed txn
+        tx = self.xid("my-xid")
+        channel.dtx_demarcation_start(xid=tx)
+        channel.dtx_demarcation_end(xid=tx)
+        channel.dtx_coordination_rollback(xid=tx)
+
+    def test_start_join_and_resume(self):
+        """
+        Ensure the correct error is signalled when both the join and
+        resume flags are set on starting an association between a
+        channel and a transcation.
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+            #failed, but need some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Join and resume both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_join(self):
+        """        
+        Verify 'join' behaviour, where a channel is associated with a
+        transaction that is already associated with another channel.        
+        """
+        #create two channels & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        channel2 = self.client.channel(2)
+        channel2.channel_open()
+        channel2.dtx_demarcation_select()
+
+        #setup
+        channel1.queue_declare(queue="one", exclusive=True)
+        channel1.queue_declare(queue="two", exclusive=True)
+        channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+        channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other with the join flag set
+        channel2.dtx_demarcation_start(xid=tx, join=True)
+
+        #do work through each channel
+        self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
+        self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+
+        #mark end on both channels
+        channel1.dtx_demarcation_end(xid=tx)
+        channel2.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+        
+
+    def test_suspend_resume(self):
+        """
+        Test suspension and resumption of an association
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+
+        #setup
+        channel.queue_declare(queue="one", exclusive=True)
+        channel.queue_declare(queue="two", exclusive=True)
+        channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+        channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+        channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+        channel.dtx_demarcation_start(xid=tx, resume=True)
+        self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+        channel.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_end_suspend_and_fail(self):
+        """        
+        Verify that the correct error is signalled if the suspend and
+        fail flag are both set when disassociating a transaction from
+        the channel        
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("suspend_and_fail")
+        channel.dtx_demarcation_start(xid=tx)
+        try:
+            channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+            self.fail("Suspend and fail both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+        #cleanup    
+        other = self.connect()
+        channel = other.channel(1)
+        channel.channel_open()
+        channel.dtx_coordination_rollback(xid=tx)
+        channel.channel_close()
+        other.close()
+    
+
+    def test_end_unknown_xid(self):
+        """        
+        Verifies that the correct exception is thrown when an attempt
+        is made to end the association for a xid not previously
+        associated with the channel
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("unknown-xid")
+        try:
+            channel.dtx_demarcation_end(xid=tx)
+            self.fail("Attempted to end association with unknown xid, expected exception!")
+        except Closed, e:
+            #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
+            self.assertConnectionException(503, e.args[0])
+
+    def test_end(self):
+        """
+        Verify that the association is terminated by end and subsequent
+        operations are non-transactional        
+        """
+        channel = self.client.channel(2)
+        channel.channel_open()
+        channel.queue_declare(queue="tx-queue", exclusive=True)
+
+        #publish a message under a transaction
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        channel.dtx_demarcation_start(xid=tx)
+        channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")        
+        channel.dtx_demarcation_end(xid=tx)
+
+        #now that association with txn is ended, publish another message
+        channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
+
+        #check the second message is available, but not the first
+        self.assertMessageCount(1, "tx-queue")
+        channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
+        msg = self.client.queue("results").get(timeout=1)
+        self.assertEqual("two", msg.message_id)
+        channel.message_cancel(destination="results")
+        #ack the message then close the channel
+        msg.ok()
+        channel.channel_close()
+
+        channel = self.channel        
+        #commit the transaction and check that the first message (and
+        #only the first message) is then delivered
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "tx-queue")
+        self.assertMessageId("one", "tx-queue")
+
+    def test_invalid_commit_one_phase_true(self):
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.channel_open()
+        tester.queue_declare(queue="dummy", exclusive=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(routing_key="dummy", body="whatever")
+        tester.dtx_demarcation_end(xid=tx)
+        tester.dtx_coordination_prepare(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=True)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.channel_close()
+            other.close()
+            self.fail("Invalid use of one_phase=True, expected exception!")
+
+    def test_invalid_commit_one_phase_false(self):
+        """
+        Test that a commit with one_phase = False is rejected if the
+        transaction in question has not yet been prepared.        
+        """
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.channel_open()
+        tester.queue_declare(queue="dummy", exclusive=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(routing_key="dummy", body="whatever")
+        tester.dtx_demarcation_end(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=False)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.channel_close()
+            other.close()
+            self.fail("Invalid use of one_phase=False, expected exception!")
+
+    def test_implicit_end(self):
+        """
+        Test that an association is implicitly ended when the channel
+        is closed (whether by exception or explicit client request)
+        and the transaction in question is marked as rollback only.
+        """
+        channel1 = self.channel
+        channel2 = self.client.channel(2)
+        channel2.channel_open()
+
+        #setup:
+        channel2.queue_declare(queue="dummy", exclusive=True)
+        channel2.message_transfer(routing_key="dummy", body="whatever")
+        tx = self.xid("dummy")
+
+        channel2.dtx_demarcation_select()
+        channel2.dtx_demarcation_start(xid=tx)
+        channel2.message_get(queue="dummy", destination="dummy")
+        self.client.queue("dummy").get(timeout=1).ok()
+        channel2.message_transfer(routing_key="dummy", body="whatever")
+        channel2.channel_close()
+
+        self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
+        channel1.dtx_coordination_rollback(xid=tx)
+
+    def test_get_timeout(self):
+        """        
+        Check that get-timeout returns the correct value, (and that a
+        transaction with a timeout can complete normally)        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+        self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)        
+        
+    def test_set_timeout(self):
+        """        
+        Test the timeout of a transaction results in the expected
+        behaviour        
+        """
+        #open new channel to allow self.channel to be used in checking te queue
+        channel = self.client.channel(2)
+        channel.channel_open()
+        #setup:
+        tx = self.xid("dummy")
+        channel.queue_declare(queue="queue-a", exclusive=True)
+        channel.queue_declare(queue="queue-b", exclusive=True)
+        channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "queue-a", "queue-b")
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+        sleep(3)
+        #check that the work has been rolled back already
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("timeout", "queue-a")
+        #check the correct codes are returned when we try to complete the txn
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)        
+
+
+
+    def test_recover(self):
+        """
+        Test basic recover behaviour
+        """
+        channel = self.channel
+
+        channel.dtx_demarcation_select()
+        channel.queue_declare(queue="dummy", exclusive=True)
+
+        prepared = []
+        for i in range(1, 10):
+            tx = self.xid("tx%s" % (i))
+            channel.dtx_demarcation_start(xid=tx)
+            channel.message_transfer(routing_key="dummy", body="message%s" % (i))
+            channel.dtx_demarcation_end(xid=tx)
+            if i in [2, 5, 6, 8]:
+                channel.dtx_coordination_prepare(xid=tx)
+                prepared.append(tx)
+            else:    
+                channel.dtx_coordination_rollback(xid=tx)
+
+        indoubt = channel.dtx_coordination_recover().in_doubt
+        #convert indoubt table to a list of xids (note: this will change for 0-10)
+        data = indoubt["xids"]
+        xids = []
+        pos = 0
+        while pos < len(data):
+            size = unpack("!B", data[pos])[0]
+            start = pos + 1
+            end = start + size
+            xid = data[start:end]
+            xids.append(xid)
+            pos = end
+        
+        #rollback the prepared transactions returned by recover
+        for x in xids:
+            channel.dtx_coordination_rollback(xid=x)            
+
+        #validate against the expected list of prepared transactions
+        actual = set(xids)
+        expected = set(prepared)
+        intersection = actual.intersection(expected)
+        
+        if intersection != expected:
+            missing = expected.difference(actual)
+            extra = actual.difference(expected)
+            for x in missing:
+                channel.dtx_coordination_rollback(xid=x)            
+            self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+    def xid(self, txid, branchqual = ''):
+        return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+        
+    def txswap(self, tx, id):
+        channel = self.channel
+        #declare two queues:
+        channel.queue_declare(queue="queue-a", exclusive=True)
+        channel.queue_declare(queue="queue-b", exclusive=True)
+        #put message with specified id on one queue:
+        channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
+
+        #start the transaction:
+        channel.dtx_demarcation_select()        
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
+
+        #'swap' the message from one queue to the other, under that transaction:
+        self.swap(self.channel, "queue-a", "queue-b")
+
+        #mark the end of the transactional work:
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+
+    def swap(self, channel, src, dest):
+        #consume from src:
+        channel.message_get(destination="temp-swap", queue=src)
+        msg = self.client.queue("temp-swap").get(timeout=1)
+        msg.ok();        
+
+        #re-publish to dest
+        channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)        
+
+    def assertMessageCount(self, expected, queue):
+        self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
+
+    def assertMessageId(self, expected, queue):
+        self.channel.message_consume(queue=queue, destination="results", no_ack=True)
+        self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
+        self.channel.message_cancel(destination="results")

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/example.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/example.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/example.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExampleTest (TestBase):
+    """
+    An example Qpid test, illustrating the unittest frameowkr and the
+    python Qpid client. The test class must inherit TestCase.  The
+    test code uses the Qpid client to interact with a qpid broker and
+    verify it behaves as expected.
+    """ 
+
+    def test_example(self):
+        """
+        An example test. Note that test functions must start with 'test_'
+        to be recognized by the test framework.
+        """
+
+        # By inheriting TestBase, self.client is automatically connected
+        # and self.channel is automatically opened as channel(1)
+        # Other channel methods mimic the protocol.
+        channel = self.channel
+
+        # Now we can send regular commands. If you want to see what the method
+        # arguments mean or what other commands are available, you can use the
+        # python builtin help() method. For example:
+        #help(chan)
+        #help(chan.exchange_declare)
+
+        # If you want browse the available protocol methods without being
+        # connected to a live server you can use the amqp-doc utility:
+        #
+        #   Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+        #
+        #   Options:
+        #       -e, --regexp    use regex instead of glob when matching
+
+        # Now that we know what commands are available we can use them to
+        # interact with the server.
+
+        # Here we use ordinal arguments.
+        self.exchange_declare(channel, 0, "test", "direct")
+
+        # Here we use keyword arguments.
+        self.queue_declare(channel, queue="test-queue")
+        channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+        # Call Channel.basic_consume to register as a consumer.
+        # All the protocol methods return a message object. The message object
+        # has fields corresponding to the reply method fields, plus a content
+        # field that is filled if the reply includes content. In this case the
+        # interesting field is the consumer_tag.
+        channel.message_consume(queue="test-queue", destination="consumer_tag")
+
+        # We can use the Client.queue(...) method to access the queue
+        # corresponding to our consumer_tag.
+        queue = self.client.queue("consumer_tag")
+
+        # Now lets publish a message and see if our consumer gets it. To do
+        # this we need to import the Content class.
+        body = "Hello World!"
+        channel.message_transfer(destination="test",
+                                 routing_key="key",
+                                 body = body)
+
+        # Now we'll wait for the message to arrive. We can use the timeout
+        # argument in case the server hangs. By default queue.get() will wait
+        # until a message arrives or the connection to the server dies.
+        msg = queue.get(timeout=10)
+
+        # And check that we got the right response with assertEqual
+        self.assertEqual(body, msg.body)
+
+        # Now acknowledge the message.
+        msg.ok()
+

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/example.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+    """Verifies standard exchange behavior.
+
+    Used as base class for classes that test standard exchanges."""
+
+    def verifyDirectExchange(self, ex):
+        """Verify that ex behaves like a direct exchange."""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+        self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+        try:
+            self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+            self.fail("Expected Empty exception")
+        except Queue.Empty: None # Expected
+
+    def verifyFanOutExchange(self, ex):
+        """Verify that ex behaves like a fanout exchange."""
+        self.queue_declare(queue="q") 
+        self.channel.queue_bind(queue="q", exchange=ex)
+        self.queue_declare(queue="p") 
+        self.channel.queue_bind(queue="p", exchange=ex)
+        for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+    def verifyTopicExchange(self, ex):
+        """Verify that ex behaves like a topic exchange"""
+        self.queue_declare(queue="a")
+        self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+        q = self.consume("a")
+        self.assertPublishGet(q, ex, "a.b.x")
+        self.assertPublishGet(q, ex, "a.x.b.x")
+        self.assertPublishGet(q, ex, "a.x.x.b.x")
+        # Shouldn't match
+        self.channel.message_transfer(destination=ex, routing_key="a.b", body="")        
+        self.channel.message_transfer(destination=ex, routing_key="a.b.x.y", body="")        
+        self.channel.message_transfer(destination=ex, routing_key="x.a.b.x", body="")        
+        self.channel.message_transfer(destination=ex, routing_key="a.b", body="")
+        self.assert_(q.empty())
+
+    def verifyHeadersExchange(self, ex):
+        """Verify that ex is a headers exchange"""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+        q = self.consume("q")
+        headers = {"name":"fred", "age":3}
+        self.assertPublishGet(q, exchange=ex, properties=headers)
+        self.channel.message_transfer(destination=ex, body="") # No headers, won't deliver
+        self.assertEmpty(q);                 
+        
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server SHOULD implement these standard exchange types: topic, headers.
+    
+    Client attempts to declare an exchange with each of these standard types.
+    """
+
+    def testDirect(self):
+        """Declare and test a direct exchange"""
+        self.exchange_declare(0, exchange="d", type="direct")
+        self.verifyDirectExchange("d")
+
+    def testFanout(self):
+        """Declare and test a fanout exchange"""
+        self.exchange_declare(0, exchange="f", type="fanout")
+        self.verifyFanOutExchange("f")
+
+    def testTopic(self):
+        """Declare and test a topic exchange"""
+        self.exchange_declare(0, exchange="t", type="topic")
+        self.verifyTopicExchange("t")
+
+    def testHeaders(self):
+        """Declare and test a headers exchange"""
+        self.exchange_declare(0, exchange="h", type="headers")
+        self.verifyHeadersExchange("h")
+        
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST, in each virtual host, pre-declare an exchange instance
+    for each standard exchange type that it implements, where the name of the
+    exchange instance is amq. followed by the exchange type name.
+    
+    Client creates a temporary queue and attempts to bind to each required
+    exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+    those types are defined).
+    """
+    def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+    def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+    def testAmqTopic(self):  self.verifyTopicExchange("amq.topic")
+        
+    def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST predeclare a direct exchange to act as the default exchange
+    for content Publish methods and for default queue bindings.
+    
+    Client checks that the default exchange is active by specifying a queue
+    binding with no exchange name, and publishing a message with a suitable
+    routing key but without specifying the exchange name, then ensuring that
+    the message arrives in the queue correctly.
+    """
+    def testDefaultExchange(self):
+        # Test automatic binding by queue name.
+        self.queue_declare(queue="d")
+        self.assertPublishConsume(queue="d", routing_key="d")
+        # Test explicit bind to default queue
+        self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+    """
+    The server MUST NOT allow clients to access the default exchange except
+    by specifying an empty exchange name in the Queue.Bind and content Publish
+    methods.
+    """
+
+class ExtensionsRuleTests(TestBase):
+    """
+    The server MAY implement other exchange types as wanted.
+    """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+    """
+    The server SHOULD support a minimum of 16 exchanges per virtual host and
+    ideally, impose no limit except as defined by available resources.
+    
+    The client creates as many exchanges as it can until the server reports
+    an error; the number of exchanges successfuly created must be at least
+    sixteen.
+    """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access to
+    the realm in which the exchange exists or will be created, or "passive"
+    access if the if-exists flag is set.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+    """
+    Exchange names starting with "amq." are reserved for predeclared and
+    standardised exchanges. The client MUST NOT attempt to create an exchange
+    starting with "amq.".
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+    """
+    Exchanges cannot be redeclared with different types.  The client MUST not
+    attempt to redeclare an existing exchange with a different type than used
+    in the original Exchange.Declare method.
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to create an exchange with a type that the
+    server does not support.
+    
+    
+    """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+    """
+    If set, and the exchange does not already exist, the server MUST raise a
+    channel exception with reply code 404 (not found).    
+    """
+    def test(self):
+        try:
+            self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+            self.fail("Expected 404 for passive declaration of unknown exchange.")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+    """
+    The server MUST support both durable and transient exchanges.
+    
+    
+    """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the durable field if the exchange already exists.
+    
+    
+    """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the auto-delete field if the exchange already
+    exists.
+    
+    
+    """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access
+    rights to the exchange's access realm.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to delete an exchange that does not exist.
+    """
+
+
+class HeadersExchangeTests(TestBase):
+    """
+    Tests for headers exchange functionality.
+    """
+    def setUp(self):
+        TestBase.setUp(self)
+        self.queue_declare(queue="q")
+        self.q = self.consume("q")
+
+    def myAssertPublishGet(self, headers):
+        self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
+
+    def myBasicPublish(self, headers):
+        self.channel.message_transfer(destination="amq.match", body="foobar", application_headers=headers)
+        
+    def testMatchAll(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+        
+        # None of these should match
+        self.myBasicPublish({})
+        self.myBasicPublish({"name":"barney"})
+        self.myBasicPublish({"name":10})
+        self.myBasicPublish({"name":"fred", "age":2})
+        self.assertEmpty(self.q)
+
+    def testMatchAny(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred"})
+        self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+        self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+        # Wont match
+        self.myBasicPublish({})
+        self.myBasicPublish({"irrelevant":0})
+        self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+    """
+    Test some miscellaneous error conditions
+    """
+    def testTypeNotKnown(self):
+        try:
+            self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+            self.fail("Expected 503 for declaration of unknown exchange type.")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def testDifferentDeclaredType(self):
+        self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+        try:
+            self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+            self.fail("Expected 530 for redeclaration of exchange with different type.")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+        #cleanup    
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.channel_open()
+        c2.exchange_delete(exchange="test_different_declared_type_exchange")
+    

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10/execution.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/execution.py?view=auto&rev=559055
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/execution.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/execution.py Tue Jul 24 07:08:32 2007
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExecutionTests (TestBase):
+    def test_flush(self):
+        channel = self.channel
+        for i in [1, 2, 3]:
+            channel.basic_publish()
+            channel.execution_flush()
+            assert(channel.completion.wait(channel.completion.command_id, timeout=1))

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10/execution.py
------------------------------------------------------------------------------
    svn:eol-style = native