You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/11 22:53:23 UTC
svn commit: r1522031 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/QueueRegistry.cpp qpid/ha/BrokerReplicator.cpp
qpid/ha/PrimaryTxObserver.cpp qpid/ha/PrimaryTxObserver.h
qpid/ha/TxReplicator.cpp qpid/ha/TxReplicator.h tests/ha_tests.py
Author: aconway
Date: Wed Sep 11 20:53:22 2013
New Revision: 1522031
URL: http://svn.apache.org/r1522031
Log:
QPID-5132: HA crash in test_tx_join_leave caused by double delete of queue.
Fix crash caused by double-delete of transaction queue when a broker joins while
a transaction is in progress.
- Ignore mode for non-participants in TX.
- Try/catch around queue & exchange deletion.
- Consistent use of QueueRegistry::get when queues are required.
- Remove unnecessary exchange delete warning.
- Remove HaBroker::delete, use realm@username when deleting queues.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Sep 11 20:53:22 2013
@@ -121,7 +121,9 @@ Queue::shared_ptr QueueRegistry::find(co
Queue::shared_ptr QueueRegistry::get(const string& name) {
Queue::shared_ptr q = find(name);
- if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ if (!q) {
+ throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
return q;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Sep 11 20:53:22 2013
@@ -269,7 +269,8 @@ class BrokerReplicator::UpdateTracker {
void clean(const std::string& name) {
QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
": no longer exists on primary");
- cleanFn(name);
+ try { cleanFn(name); }
+ catch (const framing::NotFoundException&) {}
}
std::string type;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Wed Sep 11 20:53:22 2013
@@ -83,7 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
replicationTest(hb.getSettings().replicateDefault.get()),
id(true),
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
- failed(false), ended(false)
+ failed(false), ended(false), complete(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -165,7 +165,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
- ended = true;
+ complete = true;
end(l);
}
@@ -173,16 +173,25 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
- ended = true;
+ complete = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
- // Don't destroy the tx-queue if there are connected subscriptions.
- if (ended && unfinished.empty()) {
- haBroker.getBroker().deleteQueue(
- txQueue->getName(), haBroker.getUserId(), string());
- broker.getExchanges().destroy(getExchangeName());
+ // Don't destroy the tx-queue until the transaction is complete and there
+ // are no connected subscriptions.
+ if (!ended && complete && unfinished.empty()) {
+ ended = true;
+ try {
+ haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Deleting transaction queue: " << e.what());
+ }
+ try {
+ broker.getExchanges().destroy(getExchangeName());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
+ }
}
}
@@ -207,7 +216,7 @@ void PrimaryTxObserver::cancel(const Rep
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- ended = failed = true; // Canceled before prepared.
+ complete = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Wed Sep 11 20:53:22 2013
@@ -103,7 +103,7 @@ class PrimaryTxObserver : public broker:
std::string exchangeName;
QueuePtr txQueue;
QueueIdsMap enqueues;
- bool failed, ended;
+ bool failed, ended, complete;
UuidSet members; // All members of transaction.
UuidSet unprepared; // Members that have not yet responded to prepare.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Wed Sep 11 20:53:22 2013
@@ -35,6 +35,7 @@
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/amqp_0_10/Connection.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/framing/BufferTypes.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -79,7 +80,7 @@ TxReplicator::TxReplicator(
txBuffer(new broker::TxBuffer),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- complete(false),
+ complete(false), ignore(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -119,6 +120,10 @@ void TxReplicator::sendMessage(const bro
}
}
+void TxReplicator::route(broker::Deliverable& deliverable) {
+ if (!ignore) QueueReplicator::route(deliverable);
+}
+
void TxReplicator::deliver(const broker::Message& m_) {
sys::Mutex::ScopedLock l(lock);
// Deliver message to the target queue, not the tx-queue.
@@ -215,30 +220,28 @@ void TxReplicator::rollback(const string
end(l);
}
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
TxMembersEvent e;
decodeStr(data, e);
QPID_LOG(debug, logPrefix << "Members: " << e.members);
if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
- QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
- end(l);
+ QPID_LOG(info, logPrefix << "Not participating in transaction");
+ ignore = true;
}
}
void TxReplicator::end(sys::Mutex::ScopedLock&) {
complete = true;
if (!getQueue()) return; // Already destroyed
- // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
- // Need to do this now to cancel the subscription to the primary tx-queue
- // which informs the primary that we have completed the transaction.
- haBroker.getBroker().deleteQueue(
- getQueue()->getName(), haBroker.getUserId(), string());
+ // Destroy will cancel the subscription to the primary tx-queue which
+ // informs the primary that we have completed the transaction.
+ destroy();
}
void TxReplicator::destroy() {
QueueReplicator::destroy();
sys::Mutex::ScopedLock l(lock);
- if (!complete) rollback(string(), l);
+ if (!ignore && !complete) rollback(string(), l);
}
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Wed Sep 11 20:53:22 2013
@@ -36,6 +36,7 @@ class TxAccept;
class DtxBuffer;
class Broker;
class MessageStore;
+class Deliverable;
}
namespace ha {
@@ -63,6 +64,7 @@ class TxReplicator : public QueueReplica
std::string getType() const;
// QueueReplicator overrides
+ void route(broker::Deliverable& deliverable);
void destroy();
protected:
@@ -91,7 +93,7 @@ class TxReplicator : public QueueReplica
broker::MessageStore* store;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
- bool complete;
+ bool complete, ignore;
// Class to process dequeues and create DeliveryRecords to populate a
// TxAccept.
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Sep 11 20:53:22 2013
@@ -1467,13 +1467,10 @@ class TransactionTests(BrokerTest):
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- tx_q = cluster[0].agent().tx_queues()[0]
cluster.restart(1)
- # Verify the new member should not be in the transaction.
- # but should receive the result of the transaction via normal replication.
- cluster[1].wait_no_queue(tx_q)
tx.commit()
- for b in cluster: b.assert_browse_backup("q", ["foo"])
+ # The new member is not in the tx but receives the results normal replication.
+ for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
if __name__ == "__main__":
outdir = "ha_tests.tmp"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org