You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/05 20:56:48 UTC

svn commit: r634003 [3/3] - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/tests/ cpp/xml/ python/ python/qpid/ python/tests_0-10/ python/tests_0-10_preview/

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/query.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/query.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/query.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/query.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueryTests(TestBase):
+    """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+    def test_exchange_query(self):
+        """
+        Test that the exchange_query method works as expected
+        """
+        channel = self.channel
+        #check returned type for the standard exchanges
+        self.assert_type("direct", channel.exchange_query(name="amq.direct"))
+        self.assert_type("topic", channel.exchange_query(name="amq.topic"))
+        self.assert_type("fanout", channel.exchange_query(name="amq.fanout"))
+        self.assert_type("headers", channel.exchange_query(name="amq.match"))
+        self.assert_type("direct", channel.exchange_query(name=""))        
+        #declare an exchange
+        channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+        #check that the result of a query is as expected
+        response = channel.exchange_query(name="my-test-exchange")
+        self.assert_type("direct", response)
+        self.assertEqual(False, response.durable)
+        self.assertEqual(False, response.not_found)
+        #delete the exchange
+        channel.exchange_delete(exchange="my-test-exchange")
+        #check that the query now reports not-found
+        self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+    def assert_type(self, expected_type, response):
+        self.assertEqual(expected_type, response.__getattr__("type"))
+
+    def test_binding_query_direct(self):
+        """
+        Test that the binding_query method works as expected with the direct exchange
+        """
+        self.binding_query_with_key("amq.direct")
+
+    def test_binding_query_topic(self):
+        """
+        Test that the binding_query method works as expected with the direct exchange
+        """
+        self.binding_query_with_key("amq.topic")
+
+    def binding_query_with_key(self, exchange_name):
+        channel = self.channel
+        #setup: create two queues
+        channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        
+        channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+        # test detection of any binding to specific queue
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+
+        # test detection of specific binding to any queue
+        response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test detection of specific binding to specific queue
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        # test unspecified queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test matched queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test unmatched queue, matched binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test unmatched queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+    def test_binding_query_fanout(self):
+        """
+        Test that the binding_query method works as expected with fanout exchange
+        """
+        channel = self.channel
+        #setup
+        channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+        # test detection of any binding to specific queue
+        response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+    def test_binding_query_header(self):
+        """
+        Test that the binding_query method works as expected with headers exchanges
+        """
+        channel = self.channel
+        #setup
+        channel.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+        # test detection of any binding to specific queue
+        response = channel.binding_query(exchange="amq.match", queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+
+        # test detection of specific binding to any queue
+        response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.args_not_matched)        
+
+        # test detection of specific binding to specific queue
+        response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(False, response.args_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = channel.binding_query(exchange="amq.match", queue="unused-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        # test unspecified queue, unmatched binding
+        response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.args_not_matched)        
+
+        # test matched queue, unmatched binding
+        response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(True, response.args_not_matched)        
+
+        # test unmatched queue, matched binding
+        response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(False, response.args_not_matched)        
+
+        # test unmatched queue, unmatched binding
+        response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(True, response.args_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
+        

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/query.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/queue.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/queue.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/queue.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/queue.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueueTests(TestBase):
+    """Tests for 'methods' on the amqp queue 'class'"""
+
+    def test_purge(self):
+        """
+        Test that the purge method removes messages from the queue
+        """
+        channel = self.channel
+        #setup, declare a queue and add some messages to it:
+        channel.exchange_declare(exchange="test-exchange", type="direct")
+        channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
+        channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
+        channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
+
+        #check that the queue now reports 3 messages:
+        channel.queue_declare(queue="test-queue")
+        reply = channel.queue_query(queue="test-queue")
+        self.assertEqual(3, reply.message_count)
+
+        #now do the purge, then test that three messages are purged and the count drops to 0
+        channel.queue_purge(queue="test-queue");
+        reply = channel.queue_query(queue="test-queue")
+        self.assertEqual(0, reply.message_count)        
+
+        #send a further message and consume it, ensuring that the other messages are really gone
+        channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
+        self.subscribe(queue="test-queue", destination="tag")
+        queue = self.client.queue("tag")
+        msg = queue.get(timeout=1)
+        self.assertEqual("four", msg.content.body)
+
+        #check error conditions (use new channels): 
+        channel = self.client.channel(2)
+        channel.session_open()
+        try:
+            #queue specified but doesn't exist:
+            channel.queue_purge(queue="invalid-queue")
+            self.fail("Expected failure when purging non-existent queue")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        channel = self.client.channel(3)
+        channel.session_open()
+        try:
+            #queue not specified and none previously declared for channel:
+            channel.queue_purge()
+            self.fail("Expected failure when purging unspecified queue")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+        #cleanup    
+        other = self.connect()
+        channel = other.channel(1)
+        channel.session_open()
+        channel.exchange_delete(exchange="test-exchange")
+
+    def test_declare_exclusive(self):
+        """
+        Test that the exclusive field is honoured in queue.declare
+        """
+        # TestBase.setUp has already opened channel(1)
+        c1 = self.channel
+        # Here we open a second separate connection:
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.session_open()
+
+        #declare an exclusive queue:
+        c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+        try:
+            #other connection should not be allowed to declare this:
+            c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+            self.fail("Expected second exclusive queue_declare to raise a channel exception")
+        except Closed, e:
+            self.assertChannelException(405, e.args[0])
+
+
+    def test_declare_passive(self):
+        """
+        Test that the passive field is honoured in queue.declare
+        """
+        channel = self.channel
+        #declare an exclusive queue:
+        channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="passive-queue-1", passive=True)
+        try:
+            #other connection should not be allowed to declare this:
+            channel.queue_declare(queue="passive-queue-2", passive=True)
+            self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+    def test_bind(self):
+        """
+        Test various permutations of the queue.bind method
+        """
+        channel = self.channel
+        channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+
+        #straightforward case, both exchange & queue exist so no errors expected:
+        channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+        #use the queue name where the routing key is not specified:
+        channel.queue_bind(queue="queue-1", exchange="amq.direct")
+
+        #try and bind to non-existant exchange
+        try:
+            channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+            self.fail("Expected bind to non-existant exchange to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        #need to reopen a channel:    
+        channel = self.client.channel(2)
+        channel.session_open()
+
+        #try and bind non-existant queue:
+        try:
+            channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+            self.fail("Expected bind of non-existant queue to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+    def test_unbind_direct(self):
+        self.unbind_test(exchange="amq.direct", routing_key="key")
+
+    def test_unbind_topic(self):
+        self.unbind_test(exchange="amq.topic", routing_key="key")
+
+    def test_unbind_fanout(self):
+        self.unbind_test(exchange="amq.fanout")
+
+    def test_unbind_headers(self):
+        self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+    def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+        #bind two queues and consume from them
+        channel = self.channel
+        
+        channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+
+        self.subscribe(queue="queue-1", destination="queue-1")
+        self.subscribe(queue="queue-2", destination="queue-2")
+
+        queue1 = self.client.queue("queue-1")
+        queue2 = self.client.queue("queue-2")
+
+        channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+        channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+
+        #send a message that will match both bindings
+        channel.message_transfer(destination=exchange,
+                                 content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
+        
+        #unbind first queue
+        channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+        
+        #send another message
+        channel.message_transfer(destination=exchange,
+                                 content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
+
+        #check one queue has both messages and the other has only one
+        self.assertEquals("one", queue1.get(timeout=1).content.body)
+        try:
+            msg = queue1.get(timeout=1)
+            self.fail("Got extra message: %s" % msg.content.body)
+        except Empty: pass
+
+        self.assertEquals("one", queue2.get(timeout=1).content.body)
+        self.assertEquals("two", queue2.get(timeout=1).content.body)
+        try:
+            msg = queue2.get(timeout=1)
+            self.fail("Got extra message: " + msg)
+        except Empty: pass        
+
+
+    def test_delete_simple(self):
+        """
+        Test core queue deletion behaviour
+        """
+        channel = self.channel
+
+        #straight-forward case:
+        channel.queue_declare(queue="delete-me")
+        channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
+        channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
+        channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+        channel.queue_delete(queue="delete-me")
+        #check that it has gone be declaring passively
+        try:
+            channel.queue_declare(queue="delete-me", passive=True)
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        #check attempted deletion of non-existant queue is handled correctly:    
+        channel = self.client.channel(2)
+        channel.session_open()
+        try:
+            channel.queue_delete(queue="i-dont-exist", if_empty=True)
+            self.fail("Expected delete of non-existant queue to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        
+
+    def test_delete_ifempty(self):
+        """
+        Test that if_empty field of queue_delete is honoured
+        """
+        channel = self.channel
+
+        #create a queue and add a message to it (use default binding):
+        channel.queue_declare(queue="delete-me-2")
+        channel.queue_declare(queue="delete-me-2", passive=True)
+        channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+
+        #try to delete, but only if empty:
+        try:
+            channel.queue_delete(queue="delete-me-2", if_empty=True)
+            self.fail("Expected delete if_empty to fail for non-empty queue")
+        except Closed, e:
+            self.assertChannelException(406, e.args[0])
+
+        #need new channel now:    
+        channel = self.client.channel(2)
+        channel.session_open()
+
+        #empty queue:
+        self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
+        queue = self.client.queue("consumer_tag")
+        msg = queue.get(timeout=1)
+        self.assertEqual("message", msg.content.body)
+        channel.message_cancel(destination="consumer_tag")
+
+        #retry deletion on empty queue:
+        channel.queue_delete(queue="delete-me-2", if_empty=True)
+
+        #check that it has gone by declaring passively:
+        try:
+            channel.queue_declare(queue="delete-me-2", passive=True)
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+        
+    def test_delete_ifunused(self):
+        """
+        Test that if_unused field of queue_delete is honoured
+        """
+        channel = self.channel
+
+        #create a queue and register a consumer:
+        channel.queue_declare(queue="delete-me-3")
+        channel.queue_declare(queue="delete-me-3", passive=True)
+        self.subscribe(destination="consumer_tag", queue="delete-me-3")
+
+        #need new channel now:    
+        channel2 = self.client.channel(2)
+        channel2.session_open()
+        #try to delete, but only if empty:
+        try:
+            channel2.queue_delete(queue="delete-me-3", if_unused=True)
+            self.fail("Expected delete if_unused to fail for queue with existing consumer")
+        except Closed, e:
+            self.assertChannelException(406, e.args[0])
+
+
+        channel.message_cancel(destination="consumer_tag")    
+        channel.queue_delete(queue="delete-me-3", if_unused=True)
+        #check that it has gone by declaring passively:
+        try:
+            channel.queue_declare(queue="delete-me-3", passive=True)
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+    def test_autodelete_shared(self):
+        """
+        Test auto-deletion (of non-exclusive queues)
+        """
+        channel = self.channel
+        other = self.connect()
+        channel2 = other.channel(1)
+        channel2.session_open()
+
+        channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+        #consume from both channels
+        reply = channel.basic_consume(queue="auto-delete-me")
+        channel2.basic_consume(queue="auto-delete-me")
+
+        #implicit cancel
+        channel2.session_close()
+
+        #check it is still there
+        channel.queue_declare(queue="auto-delete-me", passive=True)
+
+        #explicit cancel => queue is now unused again:
+        channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+        #NOTE: this assumes there is no timeout in use
+
+        #check that it has gone be declaring passively
+        try:
+            channel.queue_declare(queue="auto-delete-me", passive=True)
+            self.fail("Expected queue to have been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/queue.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/testlib.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/testlib.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/testlib.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Tests for the testlib itself.
+# 
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from Queue import Empty
+
+import sys
+from traceback import *
+
+def mytrace(frame, event, arg):
+    print_stack(frame);
+    print "===="
+    return mytrace
+    
+class TestBaseTest(TestBase):
+    """Verify TestBase functions work as expected""" 
+
+    def testAssertEmptyPass(self):
+        """Test assert empty works"""
+        self.queue_declare(queue="empty")
+        q = self.consume("empty")
+        self.assertEmpty(q)
+        try:
+            q.get(timeout=1)
+            self.fail("Queue is not empty.")
+        except Empty: None              # Ignore
+
+    def testAssertEmptyFail(self):
+        self.queue_declare(queue="full")
+        q = self.consume("full")
+        self.channel.message_transfer(content=Content("", properties={'routing_key':"full"}))
+        try:
+            self.assertEmpty(q);
+            self.fail("assertEmpty did not assert on non-empty queue")
+        except AssertionError: None     # Ignore
+
+    def testMessageProperties(self):
+        """Verify properties are passed with message"""
+        props={"x":1, "y":2}
+        self.queue_declare(queue="q")
+        q = self.consume("q")
+        self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/testlib.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/tx.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/tx.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/tx.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class TxTests(TestBase):
+    """
+    Tests for 'methods' on the amqp tx 'class'
+    """
+
+    def test_commit(self):
+        """
+        Test that commited publishes are delivered and commited acks are not re-delivered
+        """
+        channel2 = self.client.channel(2)
+        channel2.session_open()
+        self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+        channel2.tx_commit()
+        channel2.session_close()
+
+        #use a different channel with new subscriptions to ensure
+        #there is no redelivery of acked messages:
+        channel = self.channel
+        channel.tx_select()
+
+        self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1)
+        queue_a = self.client.queue("qa")
+
+        self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1)
+        queue_b = self.client.queue("qb")
+
+        self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1)
+        queue_c = self.client.queue("qc")
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_c.get(timeout=1)
+            self.assertEqual("TxMessage %d" % i, msg.content.body)
+            msg.complete()
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("TxMessage 6", msg.content.body)
+        msg.complete()
+
+        msg = queue_a.get(timeout=1)
+        self.assertEqual("TxMessage 7", msg.content.body)
+        msg.complete()
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        #cleanup
+        channel.tx_commit()
+
+    def test_auto_rollback(self):
+        """
+        Test that a channel closed with an open transaction is effectively rolled back
+        """
+        channel2 = self.client.channel(2)
+        channel2.session_open()
+        queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        channel2.session_close()
+        channel = self.channel
+        channel.tx_select()
+
+        self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1)
+        queue_a = self.client.queue("qa")
+
+        self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1)
+        queue_b = self.client.queue("qb")
+
+        self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1)
+        queue_c = self.client.queue("qc")
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+            msg.complete()
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.content.body)
+        msg.complete()
+
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.content.body)
+        msg.complete()
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        #cleanup
+        channel.tx_commit()
+
+    def test_rollback(self):
+        """
+        Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+        """
+        channel = self.channel
+        queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued)
+        for d in ["sub_a", "sub_b", "sub_c"]:
+            channel.message_stop(destination=d)
+
+        channel.tx_rollback()
+
+        #restart susbcriptions
+        for d in ["sub_a", "sub_b", "sub_c"]:
+            channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
+            channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+            msg.complete()
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.content.body)
+        msg.complete()
+
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.content.body)
+        msg.complete()
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        #cleanup
+        channel.tx_commit()
+
+    def perform_txn_work(self, channel, name_a, name_b, name_c):
+        """
+        Utility method that does some setup and some work under a transaction. Used for testing both
+        commit and rollback
+        """
+        #setup:
+        channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
+        channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
+        channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+
+        key = "my_key_" + name_b
+        topic = "my_topic_" + name_c 
+    
+        channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
+        channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+
+        for i in range(1, 5):
+            channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i))
+
+        channel.message_transfer(destination="amq.direct",
+                                 content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6"))
+        channel.message_transfer(destination="amq.topic",
+                                 content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7"))
+
+        channel.tx_select()
+
+        #consume and ack messages
+        self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1)
+        queue_a = self.client.queue("sub_a")
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        msg.complete()
+
+        self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1)
+        queue_b = self.client.queue("sub_b")
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.content.body)
+        msg.complete()
+
+        sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1)
+        queue_c = self.client.queue("sub_c")
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.content.body)
+        msg.complete()
+
+        #publish messages
+        for i in range(1, 5):
+            channel.message_transfer(destination="amq.topic",
+                                     content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
+                                                     body="TxMessage %d" % i))
+
+        channel.message_transfer(destination="amq.direct",
+                                 content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
+                                                 body="TxMessage 6"))
+        channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"},
+                                                 body="TxMessage 7"))
+        return queue_a, queue_b, queue_c

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/tx.py
------------------------------------------------------------------------------
    svn:eol-style = native