You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2013/09/23 15:38:46 UTC
svn commit: r1525587 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp
Queue.cpp Queue.h QueueRegistry.cpp
Author: jross
Date: Mon Sep 23 13:38:45 2013
New Revision: 1525587
URL: http://svn.apache.org/r1525587
Log:
QPID-5084: Track durable queue ownership across restarts; a patch from Pavel Moravec and Ernie Allen
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Sep 23 13:38:45 2013
@@ -355,6 +355,11 @@ Broker::Broker(const Broker::Options& co
//recover any objects via object factories
objects.restore(*this);
+ // Assign to queues their users who created them (can be done after ACL is loaded in Plugin::initializeAll above
+ if ((getAcl()) && (store.get())) {
+ queues.eachQueue(boost::bind(&qpid::broker::Queue::updateAclUserQueueCount, _1));
+ }
+
if(conf.enableMgmt) {
if (getAcl()) {
mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Sep 23 13:38:45 2013
@@ -1182,18 +1182,27 @@ void Queue::encode(Buffer& buffer) const
buffer.putShortString(name);
buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
+ buffer.putShortString(userId);
}
uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ + userId.size() + 1 /* short string */
+ encodableSettings.encodedSize();
}
+void Queue::updateAclUserQueueCount()
+{
+ if (broker->getAcl())
+ broker->getAcl()->approveCreateQueue(userId, name);
+}
+
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
+ string _userId;
buffer.getShortString(name);
FieldTable ft;
buffer.get(ft);
@@ -1207,6 +1216,12 @@ Queue::shared_ptr Queue::restore( QueueR
result.first->alternateExchangeName.assign(altExch);
}
+ //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized
+ if (buffer.available()) {
+ buffer.getShortString(_userId);
+ result.first->setOwningUser(_userId);
+ }
+
return result.first;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Sep 23 13:38:45 2013
@@ -204,6 +204,7 @@ class Queue : public boost::enable_share
QueueDepth current;
QueueBindings bindings;
std::string alternateExchangeName;
+ std::string userId; // queue owner for ACL quota purposes
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject;
@@ -384,6 +385,10 @@ class Queue : public boost::enable_share
/** Get the message at position pos, returns true if found and sets msg */
QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
+ // Remember the queue's owner so acl quotas can be restored after restart
+ void setOwningUser(std::string& _userId) { userId = _userId; }
+ void updateAclUserQueueCount();
+
QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
QPID_BROKER_EXTERN bool isLocal(const Message& msg);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Sep 23 13:38:45 2013
@@ -64,6 +64,8 @@ QueueRegistry::declare(const string& nam
//Move this to factory also?
if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
+ queue->setOwningUser(userId);
+
if (!recovering) {
//create persistent record if required
queue->create();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org