You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/03/25 21:03:44 UTC

[qpid-cpp] 01/03: QPID-8287: avoid call holding lock

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit b1f7154e183d90083a79dbe6ba9b58d2e3134def
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Mar 25 19:00:10 2019 +0000

    QPID-8287: avoid call holding lock
---
 src/qpid/ha/QueueReplicator.cpp | 18 +++++++++++++++++-
 src/qpid/ha/QueueReplicator.h   |  2 ++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/src/qpid/ha/QueueReplicator.cpp b/src/qpid/ha/QueueReplicator.cpp
index c0d2689..21d0801 100644
--- a/src/qpid/ha/QueueReplicator.cpp
+++ b/src/qpid/ha/QueueReplicator.cpp
@@ -241,12 +241,28 @@ void QueueReplicator::destroy(Mutex::ScopedLock&) {
     getBroker()->getExchanges().destroy(getName());
 }
 
+boost::shared_ptr<QueueSnapshot> QueueReplicator::getSnapshot()
+{
+    boost::shared_ptr<broker::Queue> q;
+    {
+        Mutex::ScopedLock l(lock);
+        if (queue) {
+            q = queue;
+        } else {
+            return boost::shared_ptr<QueueSnapshot>();
+        }
+    }
+    return q->getObservers().findType<QueueSnapshot>();
+}
+
 
 // Called in a broker connection thread when the bridge is created.
 // Note: called with the Link lock held.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
+    boost::shared_ptr<QueueSnapshot> qs = getSnapshot();//do outside the lock
     Mutex::ScopedLock l(lock);
     if (!queue) return;         // Already destroyed
+
     sessionHandler = &sessionHandler_;
     if (sessionHandler->getSession()) {
         // Don't overwrite the exchange property set on the primary.
@@ -258,7 +274,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
     arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
     arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
-    boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
     ReplicationIdSet snapshot;
     if (qs) {
         snapshot = qs->getSnapshot();
@@ -307,6 +322,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
             if (j == positions.end()) continue;
             position = j->second;
         }
+        Mutex::ScopedUnlock u(lock);//this method is called under lock, so need to release
         queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
         // positions will be cleaned up in dequeued()
     }
diff --git a/src/qpid/ha/QueueReplicator.h b/src/qpid/ha/QueueReplicator.h
index a4b31b6..6260b45 100644
--- a/src/qpid/ha/QueueReplicator.h
+++ b/src/qpid/ha/QueueReplicator.h
@@ -46,6 +46,7 @@ class ExchangeRegistry;
 
 namespace ha {
 class HaBroker;
+class QueueSnapshot;
 class Settings;
 
 /**
@@ -130,6 +131,7 @@ class QueueReplicator : public broker::Exchange,
     class QueueObserver;
 
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+    boost::shared_ptr<QueueSnapshot> getSnapshot();
 
     // Dispatch functions
     void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org