You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2013/09/23 15:38:46 UTC

svn commit: r1525587 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp Queue.cpp Queue.h QueueRegistry.cpp

Author: jross
Date: Mon Sep 23 13:38:45 2013
New Revision: 1525587

URL: http://svn.apache.org/r1525587
Log:
QPID-5084: Track durable queue ownership across restarts; a patch from Pavel Moravec and Ernie Allen

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Sep 23 13:38:45 2013
@@ -355,6 +355,11 @@ Broker::Broker(const Broker::Options& co
     //recover any objects via object factories
     objects.restore(*this);
 
+    // Assign to queues their users who created them (can be done after ACL is loaded in Plugin::initializeAll above
+    if ((getAcl()) && (store.get())) {
+        queues.eachQueue(boost::bind(&qpid::broker::Queue::updateAclUserQueueCount, _1));
+    }
+
     if(conf.enableMgmt) {
         if (getAcl()) {
             mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Sep 23 13:38:45 2013
@@ -1182,18 +1182,27 @@ void Queue::encode(Buffer& buffer) const
     buffer.putShortString(name);
     buffer.put(encodableSettings);
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
+    buffer.putShortString(userId);
 }
 
 uint32_t Queue::encodedSize() const
 {
     return name.size() + 1/*short string size octet*/
         + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+        + userId.size() + 1 /* short string */
         + encodableSettings.encodedSize();
 }
 
+void Queue::updateAclUserQueueCount()
+{
+  if (broker->getAcl())
+    broker->getAcl()->approveCreateQueue(userId, name);
+}
+
 Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
 {
     string name;
+    string _userId;
     buffer.getShortString(name);
     FieldTable ft;
     buffer.get(ft);
@@ -1207,6 +1216,12 @@ Queue::shared_ptr Queue::restore( QueueR
         result.first->alternateExchangeName.assign(altExch);
     }
 
+    //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized
+    if (buffer.available()) {
+        buffer.getShortString(_userId);
+        result.first->setOwningUser(_userId);
+    }
+
     return result.first;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Sep 23 13:38:45 2013
@@ -204,6 +204,7 @@ class Queue : public boost::enable_share
     QueueDepth current;
     QueueBindings bindings;
     std::string alternateExchangeName;
+    std::string userId; // queue owner for ACL quota purposes
     boost::shared_ptr<Exchange> alternateExchange;
     framing::SequenceNumber sequence;
     qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject;
@@ -384,6 +385,10 @@ class Queue : public boost::enable_share
     /** Get the message at position pos, returns true if found and sets msg */
     QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
 
+    // Remember the queue's owner so acl quotas can be restored after restart
+    void setOwningUser(std::string& _userId) { userId  = _userId; }
+    void updateAclUserQueueCount();
+
     QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
     QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
     QPID_BROKER_EXTERN bool isLocal(const Message& msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1525587&r1=1525586&r2=1525587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Sep 23 13:38:45 2013
@@ -64,6 +64,8 @@ QueueRegistry::declare(const string& nam
             //Move this to factory also?
             if (alternate)
                 queue->setAlternateExchange(alternate);//need to do this *before* create
+            queue->setOwningUser(userId);
+
             if (!recovering) {
                 //create persistent record if required
                 queue->create();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org