You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/18 18:53:53 UTC

svn commit: r1363014 - in /qpid/trunk/qpid/cpp/src/qpid/ha: Primary.cpp RemoteBackup.cpp RemoteBackup.h

Author: aconway
Date: Wed Jul 18 16:53:52 2012
New Revision: 1363014

URL: http://svn.apache.org/viewvc?rev=1363014&view=rev
Log:
QPID-4148: HA Not setting initial queues for new RemoteBackups 

Fix bug introduced by r1362584:
"QPID-4144 HA broker deadlocks on broker::QueueRegistry lock and ha::Primary lock"
Stopped setting initial queues on new (i.e. not expected) RemoteBackups.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1363014&r1=1363013&r2=1363014&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Wed Jul 18 16:53:52 2012
@@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const Bro
                 new RemoteBackup(*i, haBroker.getReplicationTest(), false));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
-            backup->createGuards(hb.getBroker().getQueues());
+            backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards
         }
         // Set timeout for expected brokers to connect and become ready.
         sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -187,13 +187,18 @@ void Primary::queueDestroy(const QueuePt
 }
 
 void Primary::opened(broker::Connection& connection) {
-    Mutex::ScopedLock l(lock);
     BrokerInfo info;
-    boost::shared_ptr<RemoteBackup> backup;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+        Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
-            backup.reset(new RemoteBackup(info, haBroker.getReplicationTest(), true));
+            boost::shared_ptr<RemoteBackup> backup(
+                new RemoteBackup(info, haBroker.getReplicationTest(), true));
+            {
+                // Avoid deadlock with queue registry lock.
+                Mutex::ScopedUnlock u(lock);
+                backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
+            }
             backups[info.getSystemId()] = backup;
             QPID_LOG(debug, logPrefix << "New backup connected: " << info);
         }
@@ -207,7 +212,7 @@ void Primary::opened(broker::Connection&
     }
     else
         QPID_LOG(debug, logPrefix << "Accepted client connection "
-                 << connection.getMgmtId())
+                 << connection.getMgmtId());
 }
 
 void Primary::closed(broker::Connection& connection) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1363014&r1=1363013&r2=1363014&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Wed Jul 18 16:53:52 2012
@@ -36,10 +36,10 @@ RemoteBackup::RemoteBackup(const BrokerI
     brokerInfo(info), replicationTest(rt), connected(con)
 {}
 
-void RemoteBackup::createGuards(broker::QueueRegistry& queues)
+void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
 {
-    QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
-    queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
+    QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : ""));
+    queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards));
 }
 
 RemoteBackup::~RemoteBackup() { cancel(); }
@@ -54,10 +54,10 @@ bool RemoteBackup::isReady() {
     return connected && initialQueues.empty();
 }
 
-void RemoteBackup::initialQueue(const QueuePtr& q) {
+void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) {
     if (replicationTest.isReplicated(ALL, *q)) {
         initialQueues.insert(q);
-        queueCreate(q);
+        if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
     }
 }
 
@@ -93,7 +93,7 @@ void RemoteBackup::ready(const QueuePtr&
     if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
 }
 
-// Called via ConfigurationObserver and from initialQueue
+// Called via ConfigurationObserver::queueCreate and from initialQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (replicationTest.isReplicated(ALL, *q))
         guards[q].reset(new QueueGuard(*q, brokerInfo));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1363014&r1=1363013&r2=1363014&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Wed Jul 18 16:53:52 2012
@@ -57,8 +57,10 @@ class RemoteBackup
     RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
     ~RemoteBackup();
 
-    /** Create initial guards for all the replicated queues in the registry. */
-    void createGuards(broker::QueueRegistry&);
+    /** Set the initial queues for all queues in the registry.
+     *@createGuards if true create guards also, if false guards will be created on demand.
+     */
+    void setInitialQueues(broker::QueueRegistry&, bool createGuards);
 
     /** Return guard associated with a queue. Used to create ReplicatingSubscription. */
     GuardPtr guard(const QueuePtr&);
@@ -90,7 +92,7 @@ class RemoteBackup
     typedef std::set<QueuePtr> QueueSet;
 
     /** Add queue to guard as an initial queue */
-    void initialQueue(const QueuePtr&);
+    void initialQueue(const QueuePtr&, bool createGuard);
 
     std::string logPrefix;
     BrokerInfo brokerInfo;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org