You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/12/10 15:11:37 UTC

svn commit: r1549844 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/

Author: aconway
Date: Tue Dec 10 14:11:36 2013
New Revision: 1549844

URL: http://svn.apache.org/r1549844
Log:
QPID-5404: HA broker message duplication when deleting a queue with an alt-exchange

The old code ran auto-delete on the backup on disconnect.  This reroutes
messages onto the alt queue with incorrect replication IDs from the original
queue, and then replicates duplicate rerouted messages from the primary.  The
solution is to process auto deletes on the new primary and let them replicate to
the backups.

- Move all auto-delete logic into QueueReplicator
- Primary process auto-delete on QueueReplicator as part of promotion.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Tue Dec 10 14:11:36 2013
@@ -100,8 +100,8 @@ Role* Backup::recover(Mutex::ScopedLock&
         // Reset membership before allowing backups to connect.
         backups = membership.otherBackups();
         membership.clear();
-        return new Primary(haBroker, backups);
     }
+    return new Primary(haBroker, backups);
 }
 
 Role* Backup::promote() {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Dec 10 14:11:36 2013
@@ -865,27 +865,14 @@ bool BrokerReplicator::hasBindings() { r
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
-void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
-    // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator
     if (qr) {
         qr->disconnect();
         if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
             // Transactions are aborted on failover so clean up tx-queues
             deleteQueue(qr->getQueue()->getName());
         }
-        else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
-            if (qr->getQueue()->getSettings().autoDeleteDelay) {
-                // Start the auto-delete timer
-                qr->getQueue()->releaseFromUse();
-                qr->getQueue()->scheduleAutoDelete();
-            }
-            else {
-                // Delete immediately. Don't purge, the primary is gone so we need
-                // to reroute the deleted messages.
-                deleteQueue(qr->getQueue()->getName(), false);
-            }
-        }
     }
 }
 
@@ -893,9 +880,9 @@ typedef vector<boost::shared_ptr<Exchang
 
 // Callback function for accumulating exchange candidates
 namespace {
-	void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
-		ev.push_back(i);
-	}
+void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
+    ev.push_back(i);
+}
 }
 
 // Called by ConnectionObserver::disconnected, disconnected from the network side.
@@ -907,7 +894,7 @@ void BrokerReplicator::disconnected() {
     ExchangeVector exs;
     exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
     for_each(exs.begin(), exs.end(),
-             boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
+             boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
 }
 
 void BrokerReplicator::setMembership(const Variant::List& brokers) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Dec 10 14:11:36 2013
@@ -148,7 +148,7 @@ class BrokerReplicator : public broker::
     void deleteQueue(const std::string& name, bool purge=true);
     void deleteExchange(const std::string& name);
 
-    void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
+    void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
     void setMembership(const types::Variant::List&); // Set membership from list.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Dec 10 14:11:36 2013
@@ -94,7 +94,16 @@ Primary::Primary(HaBroker& hb, const Bro
     logPrefix("Primary: "), active(false),
     replicationTest(hb.getSettings().replicateDefault.get())
 {
+    // Note that at this point, we are still rejecting client connections.
+    // So we are safe from client interference while we set up the primary.
+
     hb.getMembership().setStatus(RECOVERING);
+
+    // Process all QueueReplicators, handles auto-delete queues.
+    QueueReplicator::Vector qrs;
+    QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
+    std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
+
     broker::QueueRegistry& queues = hb.getBroker().getQueues();
     queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
     if (expect.empty()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Dec 10 14:11:36 2013
@@ -26,7 +26,6 @@
 #include "QueueGuard.h"
 #include "RemoteBackup.h"
 #include "ReplicatingSubscription.h"
-#include "QueueReplicator.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
@@ -121,7 +120,7 @@ void PrimaryTxObserver::initialize() {
         throw InvalidArgumentException(
             QPID_MSG(logPrefix << "TX replication queue already exists."));
     txQueue = result.first;
-    txQueue->markInUse(true); // Prevent auto-delete till we are done.
+    txQueue->markInUse(); // Prevent auto-delete till we are done.
     txQueue->deliver(TxBackupsEvent(backups).message());
 
 }
@@ -228,7 +227,8 @@ void PrimaryTxObserver::end(Mutex::Scope
     // If there are no outstanding completions, break pointer cycle here.
     // Otherwise break it in cancel() when the remaining completions are done.
     if (incomplete.empty()) txBuffer = 0;
-    txQueue->releaseFromUse(true); // txQueue will auto-delete
+    txQueue->releaseFromUse();  // txQueue will auto-delete
+    txQueue->scheduleAutoDelete();
     txQueue.reset();
     try {
         broker.getExchanges().destroy(getExchangeName());

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Dec 10 14:11:36 2013
@@ -50,6 +50,7 @@ using namespace framing::execution;
 using namespace std;
 using std::exception;
 using sys::Mutex;
+using boost::shared_ptr;
 
 const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
@@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(c
     return startsWith(name, QUEUE_REPLICATOR_PREFIX);
 }
 
+namespace {
+void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
+    shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+    if (qr) v.push_back(qr);
+}
+}
+
+void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
+    registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
+}
+
 class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
   public:
     ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
@@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroke
     framing::FieldTable args = getArgs();
     args.setString(QPID_REPLICATE, printable(NONE).str());
     setArgs(args);
+    // Don't allow backup queues to auto-delete, primary decides when to delete.
     if (q->isAutoDelete()) q->markInUse();
 
     dispatch[DequeueEvent::KEY] =
@@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::sha
 bool QueueReplicator::hasBindings() { return false; }
 std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
 
+void QueueReplicator::promoted() {
+    // Promoted to primary, deal with auto-delete now.
+    if (queue && queue->isAutoDelete() && subscribed) {
+        // Make a temporary shared_ptr to prevent premature deletion of queue.
+        // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+        // which could delete the queue while it's still running it's destroyed logic.
+        boost::shared_ptr<Queue> q(queue);
+        q->releaseFromUse();
+        q->scheduleAutoDelete();
+    }
+}
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Tue Dec 10 14:11:36 2013
@@ -38,6 +38,7 @@ class Queue;
 class QueueRegistry;
 class SessionHandler;
 class Deliverable;
+class ExchangeRegistry;
 }
 
 namespace ha {
@@ -59,9 +60,12 @@ class QueueReplicator : public broker::E
   public:
     static const std::string QPID_SYNC_FREQUENCY;
     static const std::string REPLICATOR_PREFIX;
+    typedef std::vector<boost::shared_ptr<QueueReplicator> > Vector;
 
     static std::string replicatorName(const std::string& queueName);
     static bool isReplicatorName(const std::string&);
+    /*** Copy QueueReplicators from the registry */
+    static void copy(broker::ExchangeRegistry&, Vector& result);
 
     QueueReplicator(HaBroker&,
                     boost::shared_ptr<broker::Queue> q,
@@ -78,7 +82,6 @@ class QueueReplicator : public broker::E
 
     // Set if the queue has ever been subscribed to, used for auto-delete cleanup.
     void setSubscribed() { subscribed = true; }
-    bool isSubscribed() { return subscribed; }
 
     boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
 
@@ -90,6 +93,8 @@ class QueueReplicator : public broker::E
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
     bool hasBindings();
 
+    void promoted();
+
   protected:
     typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
     typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Dec 10 14:11:36 2013
@@ -22,7 +22,6 @@
 #include "Event.h"
 #include "IdSetter.h"
 #include "QueueGuard.h"
-#include "QueueReplicator.h"
 #include "QueueSnapshots.h"
 #include "ReplicatingSubscription.h"
 #include "TxReplicatingSubscription.h"

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1549844&r1=1549843&r2=1549844&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Dec 10 14:11:36 2013
@@ -34,6 +34,14 @@ log = getLogger(__name__)
 class HaBrokerTest(BrokerTest):
     """Base class for HA broker tests"""
 
+def alt_setup(session, suffix):
+    # Create exchange to use as alternate and a queue bound to it.
+    # altex exchange: acts as alternate exchange
+    session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+    # altq queue bound to altex, collect re-routed messages.
+    session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+
 class ReplicationTests(HaBrokerTest):
     """Correctness tests for  HA replication."""
 
@@ -718,19 +726,44 @@ acl deny all all
         except NotFound: pass
         assert not cluster[1].agent().getQueue("q") # Should not be in QMF
 
-    def alt_setup(self, session, suffix):
-        # Create exchange to use as alternate and a queue bound to it.
-        # altex exchange: acts as alternate exchange
-        session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
-        # altq queue bound to altex, collect re-routed messages.
-        session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+    def test_auto_delete_failover(self):
+        """Test auto-delete queues. Verify that:
+        - queues auto-deleted on the primary are deleted on the backup.
+        - auto-delete queues with/without timeout are deleted after a failover.
+        - messages are correctly routed to the alternate exchange.
+        """
+        cluster = HaCluster(self, 3)
+        s = cluster[0].connect().session()
+        def setup(q, timeout=""):
+            if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout
+            # Create alternate exchange, auto-delete queue and queue bound to alt. ex.
+            s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q)
+            qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout))
+            s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q))
+            qs.send(q) # Send a message to the auto-delete queue
+            return s
+
+        for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args)
+        receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues
+        # Note q5 is never subscribed to, so should not be auto-deleted.
+        receivers[3].close()    # Trigger auto-delete for q4
+        cluster[0].kill(final=False)
+        cluster[2].promote()
+        cluster.restart(0)
+        cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout.
+        for i in [2,1,0]:
+            for q in ["q1", "q2", "q3","q4"]:
+                cluster[i].wait_no_queue(q,timeout=2)      # auto-deleted
+                cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate
+            cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted.
+            cluster[i].assert_browse_backup("q5-altq", [])
 
     def test_auto_delete_close(self):
         """Verify auto-delete queues are deleted on backup if auto-deleted
         on primary"""
         cluster=HaCluster(self, 2)
         p = cluster[0].connect().session()
-        self.alt_setup(p, "1")
+        alt_setup(p, "1")
         r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
         s = p.sender("adq1")
         for m in ["aa","bb","cc"]: s.send(m)
@@ -742,71 +775,6 @@ acl deny all all
         cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
         cluster[1].wait_queue("adq2")
 
-    def test_auto_delete_crash(self):
-        """Verify auto-delete queues are deleted on backup if the primary crashes"""
-        cluster=HaCluster(self, 2)
-        p = cluster[0].connect().session()
-        self.alt_setup(p,"1")
-
-        # adq1 is subscribed so will be auto-deleted.
-        r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
-        s = p.sender("adq1")
-        for m in ["aa","bb","cc"]: s.send(m)
-        # adq2 is subscribed after cluster[2] starts.
-        p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
-        # adq3 is never subscribed.
-        p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
-
-        cluster.start()
-        cluster[2].wait_status("ready")
-
-        p.receiver("adq2")      # Subscribed after cluster[2] joined
-
-        for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
-        for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
-        cluster[0].kill()
-
-        cluster[1].wait_no_queue("adq1")
-        cluster[1].wait_no_queue("adq2")
-        cluster[1].wait_queue("adq3")
-
-        cluster[2].wait_no_queue("adq1")
-        cluster[2].wait_no_queue("adq2")
-        cluster[2].wait_queue("adq3")
-
-        cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
-        cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
-
-    def test_auto_delete_timeout(self):
-        cluster = HaCluster(self, 2)
-        # Test timeout
-        r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
-        # Test special case of timeout = 0
-        r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}")
-        cluster[1].wait_queue("q0")
-        cluster[1].wait_queue("q1")
-        cluster[0].kill()
-        cluster[1].wait_queue("q1")    # Not timed out yet
-        cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout
-        cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout
-
-    def test_alt_exchange_dup(self):
-        """QPID-4349: if a queue has an alterante exchange and is deleted the
-        messages appear twice on the alternate, they are rerouted once by the
-        primary and again by the backup."""
-        cluster = HaCluster(self,2)
-
-        # Set up q with alternate exchange altex bound to altq.
-        s = cluster[0].connect().session()
-        s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
-        s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
-        snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}")
-        messages = [ str(n) for n in xrange(10) ]
-        for m in messages: snd.send(m)
-        cluster[1].assert_browse_backup("q", messages)
-        s.sender("q;{delete:always}").close()
-        cluster[1].assert_browse_backup("altq", messages)
-
     def test_expired(self):
         """Regression test for QPID-4379: HA does not properly handle expired messages"""
         # Race between messages expiring and HA replicating consumer.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org