You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/09/03 18:07:21 UTC

svn commit: r1519738 - in /qpid/trunk/qpid/cpp/src/qpid/ha: BrokerReplicator.cpp BrokerReplicator.h PrimaryTxObserver.cpp

Author: aconway
Date: Tue Sep  3 16:07:20 2013
New Revision: 1519738

URL: http://svn.apache.org/r1519738
Log:
NO-JIRA: HA minor cleanup of  disconnect logic.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1519738&r1=1519737&r2=1519738&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Sep  3 16:07:20 2013
@@ -878,54 +878,41 @@ bool BrokerReplicator::isBound(boost::sh
 
 string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
 
-void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
     boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
-    if (!qr) return;
-    assert(qr);
-    if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
-        if (qr->getQueue()->getSettings().autoDeleteDelay) {
-            // Start the auto-delete timer
-            qr->getQueue()->releaseFromUse();
-            qr->getQueue()->scheduleAutoDelete();
+    if (qr) {
+        qr->disconnect();
+        if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+            // Transactions are aborted on failover so clean up tx-queues
+            deleteQueue(qr->getQueue()->getName());
         }
-        else {
-            // Delete immediately. Don't purge, the primary is gone so we need
-            // to reroute the deleted messages.
-            deleteQueue(qr->getQueue()->getName(), false);
+        else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+            if (qr->getQueue()->getSettings().autoDeleteDelay) {
+                // Start the auto-delete timer
+                qr->getQueue()->releaseFromUse();
+                qr->getQueue()->scheduleAutoDelete();
+            }
+            else {
+                // Delete immediately. Don't purge, the primary is gone so we need
+                // to reroute the deleted messages.
+                deleteQueue(qr->getQueue()->getName(), false);
+            }
         }
     }
 }
 
 typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
-typedef vector<boost::shared_ptr<Queue> > QueueVector;
 
 // Called by ConnectionObserver::disconnected, disconnected from the network side.
 void BrokerReplicator::disconnected() {
     QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
     connection = 0;
 
-    // Make copys of queues & exchanges so we can work outside the registry lock.
-
+    // Make copy of exchanges so we can work outside the registry lock.
     ExchangeVector exs;
     exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1));
     for_each(exs.begin(), exs.end(),
-             boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
-
-    QueueVector qs;
-    queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1));
-    for_each(qs.begin(), qs.end(),
-             boost::bind(&BrokerReplicator::disconnectedQueue, this, _1));
-}
-
-// Called for queues existing when the backup is disconnected.
-void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) {
-    QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName());
-    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName());
-    if (qr) {
-        qr->disconnect();
-        if (TxReplicator::isTxQueue(q->getName()))
-            deleteQueue(q->getName());
-    }
+             boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
 }
 
 void BrokerReplicator::setMembership(const Variant::List& brokers) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1519738&r1=1519737&r2=1519738&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Sep  3 16:07:20 2013
@@ -104,7 +104,6 @@ class BrokerReplicator : public broker::
     void connected(broker::Bridge&, broker::SessionHandler&);
     void existingQueue(const boost::shared_ptr<broker::Queue>&);
     void existingExchange(const boost::shared_ptr<broker::Exchange>&);
-    void disconnectedQueue(const boost::shared_ptr<broker::Queue>&);
 
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
@@ -140,8 +139,7 @@ class BrokerReplicator : public broker::
     void deleteQueue(const std::string& name, bool purge=true);
     void deleteExchange(const std::string& name);
 
-    void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
-
+    void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
     void disconnected();
 
     void setMembership(const types::Variant::List&); // Set membership from list.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1519738&r1=1519737&r2=1519738&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Sep  3 16:07:20 2013
@@ -83,9 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     logPrefix = "Primary transaction "+shortStr(id)+": ";
 
     // The brokers known at this point are the ones that will be included
-    // in the transaction. Brokers that join later are not included
-    // Latecomers that have replicated the transaction will be rolled back
-    // when the tx-queue is deleted.
+    // in the transaction. Brokers that join later are not included.
     //
     BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
     std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
@@ -102,7 +100,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaB
     assert(result.second);
     txQueue = result.first;
     txQueue->deliver(TxMembersEvent(members).message());
-    // Do this last, it will start concurrent callbacks.
 }
 
 PrimaryTxObserver::~PrimaryTxObserver() {}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org