You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/03 22:52:52 UTC

svn commit: r1519846 - in /qpid/trunk/qpid/cpp/src: qpid/ha/PrimaryTxObserver.cpp qpid/ha/PrimaryTxObserver.h tests/ha_tests.py

Author: aconway
Date: Tue Sep  3 20:52:51 2013
New Revision: 1519846

URL: http://svn.apache.org/r1519846
Log:
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.

Was not removing transaction exchange at end of transaction.
Fix to transaction end logic.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Sep  3 20:52:51 2013
@@ -47,7 +47,7 @@ using namespace qpid::framing;
 class PrimaryTxObserver::Exchange : public broker::Exchange {
   public:
     Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
-        broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+        broker::Exchange(tx_->getExchangeName()),
         tx(tx_)
     {
         dispatch[TxPrepareOkEvent::KEY] =
@@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : publ
     DispatchMap dispatch;
     boost::shared_ptr<PrimaryTxObserver> tx;
 };
+
 const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
 
 PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
-    haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
+    haBroker(hb), broker(hb.getBroker()), id(true),
+    exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+    failed(false), ended(false)
 {
     logPrefix = "Primary transaction "+shortStr(id)+": ";
 
@@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Commit");
     txQueue->deliver(TxCommitEvent().message());
+    ended = true;
     end(l);
 }
 
@@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Rollback");
     txQueue->deliver(TxRollbackEvent().message());
+    ended = true;
     end(l);
 }
 
 void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
     // Don't destroy the tx-queue if there are connected subscriptions.
-    if (unfinished.empty()) {
-        // Destroying the queue will result in destruction of this when
-        // the queues observer references are cleared.
+    if (ended && unfinished.empty()) {
         haBroker.deleteQueue(txQueue->getName());
+        broker.getExchanges().destroy(getExchangeName());
     }
 }
 
@@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const Rep
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = rs.getBrokerInfo().getSystemId();
     if (unprepared.find(backup) != unprepared.end()) {
-        failed = true;            // Canceled before prepared.
+        ended = failed = true;    // Canceled before prepared.
         unprepared.erase(backup); // Consider it prepared-fail
     }
     unfinished.erase(backup);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Tue Sep  3 20:52:51 2013
@@ -76,6 +76,7 @@ class PrimaryTxObserver : public broker:
 
     types::Uuid getId() const { return id; }
     QueuePtr getTxQueue() const { return txQueue; }
+    std::string getExchangeName() const { return exchangeName; }
 
     // Notify that a backup subscription has been cancelled.
     void cancel(const ReplicatingSubscription&);
@@ -96,10 +97,12 @@ class PrimaryTxObserver : public broker:
     std::string logPrefix;
     HaBroker& haBroker;
     broker::Broker& broker;
+
     types::Uuid id;
+    std::string exchangeName;
     QueuePtr txQueue;
     QueueIdsMap enqueues;
-    bool failed;
+    bool failed, ended;
 
     UuidSet members;            // All members of transaction.
     UuidSet unprepared;         // Members that have not yet responded to prepare.

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Sep  3 20:52:51 2013
@@ -37,6 +37,7 @@ def grep(filename, regexp):
 
 class HaBrokerTest(BrokerTest):
     """Base class for HA broker tests"""
+
     def assert_log_no_errors(self, broker):
         log = broker.get_log()
         if grep(log, re.compile("] error|] critical")):
@@ -1329,8 +1330,7 @@ class TransactionTests(BrokerTest):
         cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
         tx.sync()
-
-        self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+        tx_queues = cluster[0].agent().tx_queues()
 
         # NOTE: backup does not process transactional dequeues until prepare
         cluster[1].assert_browse_backup("a", ["x","y","z"])
@@ -1340,10 +1340,26 @@ class TransactionTests(BrokerTest):
         tx.commit()
         tx.sync()
 
-        for b in cluster: self.assert_simple_commit_outcome(b)
-        self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+        for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+    def assert_tx_cleanup(self, b, tx_queues):
+        """Verify that there are no transaction artifacts
+        (exchanges, queues, subscriptions) on b."""
 
-    def assert_simple_commit_outcome(self, b):
+        self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+        self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+        # TX exchanges don't show up in management so test for existence by name.
+        s = b.connect_admin().session()
+        try:
+            for q in tx_queues:
+                try:
+                    s.sender("%s;{node:{type:topic}}"%q)
+                    self.fail("Found tx exchange %s on %s "%(q,b))
+                except NotFound: pass
+        finally: s.connection.close()
+
+    def assert_simple_commit_outcome(self, b, tx_queues):
         b.assert_browse_backup("a", [], msg=b)
         b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
         # Check for expected actions on the store
@@ -1357,17 +1373,18 @@ class TransactionTests(BrokerTest):
 <commit tx=1>
 """
         self.assertEqual(expect, open_read(b.store_log), msg=b)
-        # Check that transaction artifacts are cleaned up.
-        self.assertEqual([], b.agent().tx_queues(), msg=b)
+        self.assert_tx_cleanup(b, tx_queues)
 
     def test_tx_simple_rollback(self):
         cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
         tx.acknowledge()
         tx.rollback()
-        for b in cluster: self.assert_simple_rollback_outcome(b)
+        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
 
-    def assert_simple_rollback_outcome(self, b):
+    def assert_simple_rollback_outcome(self, b, tx_queues):
         b.assert_browse_backup("a", ["x","y","z"], msg=b)
         b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
         # Check for expected actions on the store
@@ -1376,19 +1393,19 @@ class TransactionTests(BrokerTest):
 <enqueue a z>
 """
         self.assertEqual(open_read(b.store_log), expect, msg=b)
-        # Check that transaction artifacts are cleaned up.
-        self.assertEqual([], b.agent().tx_queues(), msg=b)
+        self.assert_tx_cleanup(b, tx_queues)
 
     def test_tx_simple_failover(self):
         cluster = HaCluster(self, 3, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
         tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
         tx.acknowledge()
         cluster.bounce(0)       # Should cause roll-back
         cluster[0].wait_status("ready") # Restarted.
         cluster[1].wait_status("active") # Promoted.
         cluster[2].wait_status("ready")  # Failed over.
-        for b in cluster: self.assert_simple_rollback_outcome(b)
+        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
 
     def test_tx_no_backups(self):
         """Test the special case of a TX where there are no backups"""
@@ -1399,15 +1416,18 @@ class TransactionTests(BrokerTest):
         tx.acknowledge()
         tx.commit()
         tx.sync()
-        self.assert_simple_commit_outcome(cluster[0])
+        tx_queues = cluster[0].agent().tx_queues()
+        self.assert_simple_commit_outcome(cluster[0], tx_queues)
 
         # Test rollback
         cluster = HaCluster(self, 1, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+        tx_queues = cluster[0].agent().tx_queues()
         tx.acknowledge()
         tx.rollback()
         tx.sync()
-        self.assert_simple_rollback_outcome(cluster[0])
+        self.assert_simple_rollback_outcome(cluster[0], tx_queues)
 
 
     def test_tx_backup_fail(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org