You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2010/10/22 23:40:39 UTC

svn commit: r1026501 - /qpid/trunk/qpid/cpp/src/tests/store.py

Author: shuston
Date: Fri Oct 22 21:40:39 2010
New Revision: 1026501

URL: http://svn.apache.org/viewvc?rev=1026501&view=rev
Log:
Test module to run recovery-oriented store tests.

Added:
    qpid/trunk/qpid/cpp/src/tests/store.py

Added: qpid/trunk/qpid/cpp/src/tests/store.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/store.py?rev=1026501&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/store.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/store.py Fri Oct 22 21:40:39 2010
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import errno, os, time
+from qpid.brokertest import *
+from qpid import compat, session
+from qpid.util import connect
+from qpid.connection import Connection
+from qpid.datatypes import Message, uuid4
+from qpid.queue import Empty
+
+class StoreTests(BrokerTest):
+
+  XA_RBROLLBACK = 1
+  XA_RBTIMEOUT = 2
+  XA_OK = 0
+  tx_counter = 0
+
+  def configure(self, config):
+    self.config = config
+    self.defines = self.config.defines
+    BrokerTest.configure(self, config)
+
+  def setup_connection(self):
+    socket = connect(self._broker.host(), self._broker.port())
+    return Connection(sock=socket)
+
+  def setup_session(self):
+    self.conn.start()
+    return self.conn.session(str(uuid4()))
+
+  def start_session(self):
+    self.conn = self.setup_connection()
+    self.ssn = self.setup_session()
+
+  def setUp(self):
+    BrokerTest.setUp(self)
+    self._broker = self.broker()
+    self.start_session()
+
+  def cycle_broker(self):
+    # tearDown resets working dir; change it back after.
+    d = os.getcwd()
+    BrokerTest.tearDown(self)
+    os.chdir(d)
+    self._broker = None
+    self._broker = self.broker()
+    self.conn = self.setup_connection()
+    self.ssn = self.setup_session()
+
+  def xid(self, txid):
+    StoreTests.tx_counter += 1
+    branchqual = "v%s" % StoreTests.tx_counter
+    return self.ssn.xid(format=0, global_id=txid, branch_id=branchqual)
+
+  def testDurableExchange(self):
+    try:
+      self.ssn.exchange_delete(exchange="DE1")
+    except:
+      # restart the session busted from the exception
+      self.start_session()
+
+    self.ssn.exchange_declare(exchange="DE1", type="direct", durable=True)
+    response = self.ssn.exchange_query(name="DE1")
+    self.assert_(response.durable)
+    self.assert_(not response.not_found)
+
+    # Cycle the broker and make sure the exchange recovers
+    self.cycle_broker()
+    response = self.ssn.exchange_query(name="DE1")
+    self.assert_(response.durable)
+    self.assert_(not response.not_found)
+
+    self.ssn.exchange_delete(exchange="DE1")
+
+  def testDurableQueues(self):
+    try:
+      self.ssn.queue_delete(queue="DQ1")
+    except:
+      self.start_session()
+
+    self.ssn.queue_declare(queue="DQ1", durable=True)
+    response = self.ssn.queue_query(queue="DQ1")
+    self.assertEqual("DQ1", response.queue)
+    self.assert_(response.durable)
+
+    # Cycle the broker and make sure the queue recovers
+    self.cycle_broker()
+    response = self.ssn.queue_query(queue="DQ1")
+    self.assertEqual("DQ1", response.queue)
+    self.assert_(response.durable)
+
+    self.ssn.queue_delete(queue="DQ1")
+
+  def testDurableBindings(self):
+    try:
+      self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1")
+    except:
+      self.start_session()
+    try:
+      self.ssn.exchange_delete(exchange="DB_E1")
+    except:
+      self.start_session()
+    try:
+      self.ssn.queue_delete(queue="DB_Q1")
+    except:
+      self.start_session()
+
+    self.ssn.queue_declare(queue="DB_Q1", durable=True)
+    self.ssn.exchange_declare(exchange="DB_E1", type="direct", durable=True)
+    self.ssn.exchange_bind(exchange="DB_E1", queue="DB_Q1", binding_key="K1")
+
+    # Cycle the broker and make sure the binding recovers
+    self.cycle_broker()
+    response = self.ssn.exchange_bound(exchange="DB_E1", queue="DB_Q1", binding_key="K1")
+    self.assert_(not response.exchange_not_found)
+    self.assert_(not response.queue_not_found)
+    self.assert_(not response.queue_not_matched)
+    self.assert_(not response.key_not_matched)
+
+    self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1")
+    self.ssn.exchange_delete(exchange="DB_E1")
+    self.ssn.queue_delete(queue="DB_Q1")
+
+  def testDtxRecoverPrepared(self):
+    try:
+      self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx")
+    except:
+      self.start_session()
+    try:
+      self.ssn.exchange_delete(exchange="Dtx_E")
+    except:
+      self.start_session()
+    try:
+      self.ssn.queue_delete(queue="Dtx_Q")
+    except:
+      self.start_session()
+
+    self.ssn.queue_declare(queue="Dtx_Q", auto_delete=False, durable=True)
+    self.ssn.exchange_declare(exchange="Dtx_E", type="direct", durable=True)
+    self.ssn.exchange_bind(exchange="Dtx_E", queue="Dtx_Q", binding_key="Dtx")
+    txid = self.xid("DtxRecoverPrepared")
+    self.ssn.dtx_select()
+    self.ssn.dtx_start(xid=txid)
+    # 2 = delivery_mode.persistent
+    dp = self.ssn.delivery_properties(routing_key="Dtx_Q", delivery_mode=2)
+    self.ssn.message_transfer(message=Message(dp, "transactional message"))
+    self.ssn.dtx_end(xid=txid)
+    self.assertEqual(self.XA_OK, self.ssn.dtx_prepare(xid=txid).status)
+    # Cycle the broker and make sure the xid is there, the message is not
+    # queued.
+    self.cycle_broker()
+    # The txid should be recovered and in doubt
+    xids = self.ssn.dtx_recover().in_doubt
+    xid_matched = False
+    for x in xids:
+      self.assertEqual(txid.format, x.format)
+      self.assertEqual(txid.global_id, x.global_id)
+      self.assertEqual(txid.branch_id, x.branch_id)
+      xid_matched = True
+    self.assert_(xid_matched)
+    self.ssn.message_subscribe(destination="dtx_msgs", queue="Dtx_Q", accept_mode=1, acquire_mode=0)
+    self.ssn.message_flow(unit = 1, value = 0xFFFFFFFFL, destination = "dtx_msgs")
+    self.ssn.message_flow(unit = 0, value = 10, destination = "dtx_msgs")
+    message_arrivals = self.ssn.incoming("dtx_msgs")
+    try:
+      message_arrivals.get(timeout=1)
+      assert False, 'Message present in queue before commit'
+    except Empty: pass
+    self.ssn.dtx_select()
+    self.assertEqual(self.XA_OK, self.ssn.dtx_commit(xid=txid, one_phase=False).status)
+    try:
+      msg = message_arrivals.get(timeout=1)
+      self.assertEqual("transactional message", msg.body)
+    except Empty:
+      assert False, 'Message should be present after dtx commit but is not'
+
+    self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx")
+    self.ssn.exchange_delete(exchange="Dtx_E")
+    self.ssn.queue_delete(queue="Dtx_Q")



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org