You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/03 22:52:52 UTC
svn commit: r1519846 - in /qpid/trunk/qpid/cpp/src:
qpid/ha/PrimaryTxObserver.cpp qpid/ha/PrimaryTxObserver.h tests/ha_tests.py
Author: aconway
Date: Tue Sep 3 20:52:51 2013
New Revision: 1519846
URL: http://svn.apache.org/r1519846
Log:
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction.
Fix to transaction end logic.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Sep 3 20:52:51 2013
@@ -47,7 +47,7 @@ using namespace qpid::framing;
class PrimaryTxObserver::Exchange : public broker::Exchange {
public:
Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
- broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+ broker::Exchange(tx_->getExchangeName()),
tx(tx_)
{
dispatch[TxPrepareOkEvent::KEY] =
@@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : publ
DispatchMap dispatch;
boost::shared_ptr<PrimaryTxObserver> tx;
};
+
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
+ haBroker(hb), broker(hb.getBroker()), id(true),
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ failed(false), ended(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ ended = true;
end(l);
}
@@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ ended = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
- if (unfinished.empty()) {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
+ if (ended && unfinished.empty()) {
haBroker.deleteQueue(txQueue->getName());
+ broker.getExchanges().destroy(getExchangeName());
}
}
@@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const Rep
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- failed = true; // Canceled before prepared.
+ ended = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Tue Sep 3 20:52:51 2013
@@ -76,6 +76,7 @@ class PrimaryTxObserver : public broker:
types::Uuid getId() const { return id; }
QueuePtr getTxQueue() const { return txQueue; }
+ std::string getExchangeName() const { return exchangeName; }
// Notify that a backup subscription has been cancelled.
void cancel(const ReplicatingSubscription&);
@@ -96,10 +97,12 @@ class PrimaryTxObserver : public broker:
std::string logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
+
types::Uuid id;
+ std::string exchangeName;
QueuePtr txQueue;
QueueIdsMap enqueues;
- bool failed;
+ bool failed, ended;
UuidSet members; // All members of transaction.
UuidSet unprepared; // Members that have not yet responded to prepare.
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1519846&r1=1519845&r2=1519846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Sep 3 20:52:51 2013
@@ -37,6 +37,7 @@ def grep(filename, regexp):
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
+
def assert_log_no_errors(self, broker):
log = broker.get_log()
if grep(log, re.compile("] error|] critical")):
@@ -1329,8 +1330,7 @@ class TransactionTests(BrokerTest):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
-
- self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+ tx_queues = cluster[0].agent().tx_queues()
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
@@ -1340,10 +1340,26 @@ class TransactionTests(BrokerTest):
tx.commit()
tx.sync()
- for b in cluster: self.assert_simple_commit_outcome(b)
- self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+ for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+ def assert_tx_cleanup(self, b, tx_queues):
+ """Verify that there are no transaction artifacts
+ (exchanges, queues, subscriptions) on b."""
- def assert_simple_commit_outcome(self, b):
+ self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+ self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+ # TX exchanges don't show up in management so test for existence by name.
+ s = b.connect_admin().session()
+ try:
+ for q in tx_queues:
+ try:
+ s.sender("%s;{node:{type:topic}}"%q)
+ self.fail("Found tx exchange %s on %s "%(q,b))
+ except NotFound: pass
+ finally: s.connection.close()
+
+ def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
# Check for expected actions on the store
@@ -1357,17 +1373,18 @@ class TransactionTests(BrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- # Check that transaction artifacts are cleaned up.
- self.assertEqual([], b.agent().tx_queues(), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
- for b in cluster: self.assert_simple_rollback_outcome(b)
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- def assert_simple_rollback_outcome(self, b):
+ def assert_simple_rollback_outcome(self, b, tx_queues):
b.assert_browse_backup("a", ["x","y","z"], msg=b)
b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
# Check for expected actions on the store
@@ -1376,19 +1393,19 @@ class TransactionTests(BrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- # Check that transaction artifacts are cleaned up.
- self.assertEqual([], b.agent().tx_queues(), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
cluster[0].wait_status("ready") # Restarted.
cluster[1].wait_status("active") # Promoted.
cluster[2].wait_status("ready") # Failed over.
- for b in cluster: self.assert_simple_rollback_outcome(b)
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
@@ -1399,15 +1416,18 @@ class TransactionTests(BrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
- self.assert_simple_commit_outcome(cluster[0])
+ tx_queues = cluster[0].agent().tx_queues()
+ self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
cluster = HaCluster(self, 1, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
tx.sync()
- self.assert_simple_rollback_outcome(cluster[0])
+ self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def test_tx_backup_fail(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org