You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/11 22:53:23 UTC

svn commit: r1522031 - in /qpid/trunk/qpid/cpp/src: qpid/broker/QueueRegistry.cpp qpid/ha/BrokerReplicator.cpp qpid/ha/PrimaryTxObserver.cpp qpid/ha/PrimaryTxObserver.h qpid/ha/TxReplicator.cpp qpid/ha/TxReplicator.h tests/ha_tests.py

Author: aconway
Date: Wed Sep 11 20:53:22 2013
New Revision: 1522031

URL: http://svn.apache.org/r1522031
Log:
QPID-5132: HA crash in test_tx_join_leave caused by double delete of queue. 

Fix crash caused by double-delete of transaction queue when a broker joins while
a transaction is in progress.

- Ignore mode for non-participants in TX.
- Try/catch around queue & exchange deletion.
- Consistent use of QueueRegistry::get when queues are required.
- Remove unnecessary exchange delete warning.
- Remove HaBroker::delete, use realm@username when deleting queues.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Wed Sep 11 20:53:22 2013
@@ -121,7 +121,9 @@ Queue::shared_ptr QueueRegistry::find(co
 
 Queue::shared_ptr QueueRegistry::get(const string& name) {
     Queue::shared_ptr q = find(name);
-    if (!q) throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+    if (!q) {
+        throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
+    }
     return q;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Sep 11 20:53:22 2013
@@ -269,7 +269,8 @@ class BrokerReplicator::UpdateTracker {
     void clean(const std::string& name) {
         QPID_LOG(info, "Backup: Deleted " << type << " " << name <<
                  ": no longer exists on primary");
-        cleanFn(name);
+        try { cleanFn(name); }
+        catch (const framing::NotFoundException&) {}
     }
 
     std::string type;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Wed Sep 11 20:53:22 2013
@@ -83,7 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     replicationTest(hb.getSettings().replicateDefault.get()),
     id(true),
     exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
-    failed(false), ended(false)
+    failed(false), ended(false), complete(false)
 {
     logPrefix = "Primary transaction "+shortStr(id)+": ";
 
@@ -165,7 +165,7 @@ void PrimaryTxObserver::commit() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Commit");
     txQueue->deliver(TxCommitEvent().message());
-    ended = true;
+    complete = true;
     end(l);
 }
 
@@ -173,16 +173,25 @@ void PrimaryTxObserver::rollback() {
     sys::Mutex::ScopedLock l(lock);
     QPID_LOG(debug, logPrefix << "Rollback");
     txQueue->deliver(TxRollbackEvent().message());
-    ended = true;
+    complete = true;
     end(l);
 }
 
 void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
-    // Don't destroy the tx-queue if there are connected subscriptions.
-    if (ended && unfinished.empty()) {
-        haBroker.getBroker().deleteQueue(
-            txQueue->getName(), haBroker.getUserId(), string());
-        broker.getExchanges().destroy(getExchangeName());
+    // Don't destroy the tx-queue until the transaction is complete and there
+    // are no connected subscriptions.
+    if (!ended && complete && unfinished.empty()) {
+        ended = true;
+        try {
+            haBroker.getBroker().deleteQueue(txQueue->getName(), haBroker.getUserId(), string());
+        } catch (const std::exception& e) {
+            QPID_LOG(error, logPrefix << "Deleting transaction queue: "  << e.what());
+        }
+        try {
+            broker.getExchanges().destroy(getExchangeName());
+        } catch (const std::exception& e) {
+            QPID_LOG(error, logPrefix << "Deleting transaction exchange: "  << e.what());
+        }
     }
 }
 
@@ -207,7 +216,7 @@ void PrimaryTxObserver::cancel(const Rep
     sys::Mutex::ScopedLock l(lock);
     types::Uuid backup = rs.getBrokerInfo().getSystemId();
     if (unprepared.find(backup) != unprepared.end()) {
-        ended = failed = true;    // Canceled before prepared.
+        complete = failed = true;    // Canceled before prepared.
         unprepared.erase(backup); // Consider it prepared-fail
     }
     unfinished.erase(backup);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Wed Sep 11 20:53:22 2013
@@ -103,7 +103,7 @@ class PrimaryTxObserver : public broker:
     std::string exchangeName;
     QueuePtr txQueue;
     QueueIdsMap enqueues;
-    bool failed, ended;
+    bool failed, ended, complete;
 
     UuidSet members;            // All members of transaction.
     UuidSet unprepared;         // Members that have not yet responded to prepare.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Wed Sep 11 20:53:22 2013
@@ -35,6 +35,7 @@
 #include "qpid/broker/TxBuffer.h"
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/amqp_0_10/Connection.h"
+#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/framing/BufferTypes.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
@@ -79,7 +80,7 @@ TxReplicator::TxReplicator(
     txBuffer(new broker::TxBuffer),
     store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
     channel(link->nextChannel()),
-    complete(false),
+    complete(false), ignore(false),
     dequeueState(hb.getBroker().getQueues())
 {
     string id(getTxId(txQueue->getName()));
@@ -119,6 +120,10 @@ void TxReplicator::sendMessage(const bro
     }
 }
 
+void TxReplicator::route(broker::Deliverable& deliverable) {
+    if (!ignore) QueueReplicator::route(deliverable);
+}
+
 void TxReplicator::deliver(const broker::Message& m_) {
     sys::Mutex::ScopedLock l(lock);
     // Deliver message to the target queue, not the tx-queue.
@@ -215,30 +220,28 @@ void TxReplicator::rollback(const string
     end(l);
 }
 
-void TxReplicator::members(const string& data, sys::Mutex::ScopedLock& l) {
+void TxReplicator::members(const string& data, sys::Mutex::ScopedLock&) {
     TxMembersEvent e;
     decodeStr(data, e);
     QPID_LOG(debug, logPrefix << "Members: " << e.members);
     if (!e.members.count(haBroker.getMembership().getSelf().getSystemId())) {
-        QPID_LOG(debug, logPrefix << "Not a member of transaction, terminating");
-        end(l);
+        QPID_LOG(info, logPrefix << "Not participating in transaction");
+        ignore = true;
     }
 }
 
 void TxReplicator::end(sys::Mutex::ScopedLock&) {
     complete = true;
     if (!getQueue()) return;    // Already destroyed
-    // Destroy the tx-queue, which will destroy this via QueueReplicator destroy.
-    // Need to do this now to cancel the subscription to the primary tx-queue
-    // which informs the primary that we have completed the transaction.
-    haBroker.getBroker().deleteQueue(
-        getQueue()->getName(), haBroker.getUserId(), string());
+    // Destroy will cancel the subscription to the primary tx-queue which
+    // informs the primary that we have completed the transaction.
+    destroy();
 }
 
 void TxReplicator::destroy() {
     QueueReplicator::destroy();
     sys::Mutex::ScopedLock l(lock);
-    if (!complete) rollback(string(), l);
+    if (!ignore && !complete) rollback(string(), l);
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Wed Sep 11 20:53:22 2013
@@ -36,6 +36,7 @@ class TxAccept;
 class DtxBuffer;
 class Broker;
 class MessageStore;
+class Deliverable;
 }
 
 namespace ha {
@@ -63,6 +64,7 @@ class TxReplicator : public QueueReplica
     std::string getType() const;
 
     // QueueReplicator overrides
+    void route(broker::Deliverable& deliverable);
     void destroy();
 
   protected:
@@ -91,7 +93,7 @@ class TxReplicator : public QueueReplica
     broker::MessageStore* store;
     std::auto_ptr<broker::TransactionContext> context;
     framing::ChannelId channel; // Channel to send prepare-complete.
-    bool complete;
+    bool complete, ignore;
 
     // Class to process dequeues and create DeliveryRecords to populate a
     // TxAccept.

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1522031&r1=1522030&r2=1522031&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Sep 11 20:53:22 2013
@@ -1467,13 +1467,10 @@ class TransactionTests(BrokerTest):
         tx = cluster[0].connect().session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("foo")
-        tx_q = cluster[0].agent().tx_queues()[0]
         cluster.restart(1)
-        # Verify the new member should not be in the transaction.
-        # but should receive the result of the transaction via normal replication.
-        cluster[1].wait_no_queue(tx_q)
         tx.commit()
-        for b in cluster: b.assert_browse_backup("q", ["foo"])
+        # The new member is not in the tx but  receives the results normal replication.
+        for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
 
 if __name__ == "__main__":
     outdir = "ha_tests.tmp"



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org