You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/03/25 21:03:44 UTC
[qpid-cpp] 01/03: QPID-8287: avoid call holding lock
This is an automated email from the ASF dual-hosted git repository.
gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git
commit b1f7154e183d90083a79dbe6ba9b58d2e3134def
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Mar 25 19:00:10 2019 +0000
QPID-8287: avoid call holding lock
---
src/qpid/ha/QueueReplicator.cpp | 18 +++++++++++++++++-
src/qpid/ha/QueueReplicator.h | 2 ++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/src/qpid/ha/QueueReplicator.cpp b/src/qpid/ha/QueueReplicator.cpp
index c0d2689..21d0801 100644
--- a/src/qpid/ha/QueueReplicator.cpp
+++ b/src/qpid/ha/QueueReplicator.cpp
@@ -241,12 +241,28 @@ void QueueReplicator::destroy(Mutex::ScopedLock&) {
getBroker()->getExchanges().destroy(getName());
}
+boost::shared_ptr<QueueSnapshot> QueueReplicator::getSnapshot()
+{
+ boost::shared_ptr<broker::Queue> q;
+ {
+ Mutex::ScopedLock l(lock);
+ if (queue) {
+ q = queue;
+ } else {
+ return boost::shared_ptr<QueueSnapshot>();
+ }
+ }
+ return q->getObservers().findType<QueueSnapshot>();
+}
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
+ boost::shared_ptr<QueueSnapshot> qs = getSnapshot();//do outside the lock
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
+
sessionHandler = &sessionHandler_;
if (sessionHandler->getSession()) {
// Don't overwrite the exchange property set on the primary.
@@ -258,7 +274,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
- boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
ReplicationIdSet snapshot;
if (qs) {
snapshot = qs->getSnapshot();
@@ -307,6 +322,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
if (j == positions.end()) continue;
position = j->second;
}
+ Mutex::ScopedUnlock u(lock);//this method is called under lock, so need to release
queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
// positions will be cleaned up in dequeued()
}
diff --git a/src/qpid/ha/QueueReplicator.h b/src/qpid/ha/QueueReplicator.h
index a4b31b6..6260b45 100644
--- a/src/qpid/ha/QueueReplicator.h
+++ b/src/qpid/ha/QueueReplicator.h
@@ -46,6 +46,7 @@ class ExchangeRegistry;
namespace ha {
class HaBroker;
+class QueueSnapshot;
class Settings;
/**
@@ -130,6 +131,7 @@ class QueueReplicator : public broker::Exchange,
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ boost::shared_ptr<QueueSnapshot> getSnapshot();
// Dispatch functions
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org