You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/05 18:43:03 UTC

svn commit: r482723 - in /incubator/qpid/trunk/qpid/cpp/lib/broker: BrokerQueue.cpp BrokerQueue.h MessageStore.h MessageStoreModule.cpp MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h QueuePolicy.cpp QueuePolicy.h

Author: gsim
Date: Tue Dec  5 09:43:00 2006
New Revision: 482723

URL: http://svn.apache.org/viewvc?view=rev&rev=482723
Log:
Allow settings to be set and persisted for queues.
Define policy based on these settings.


Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Tue Dec  5 09:43:00 2006
@@ -161,12 +161,14 @@
 }
 
 void Queue::pop(){
-    messages.pop();
+    if (policy.get()) policy->dequeued(messages.front(), store);
+    messages.pop();    
 }
 
 void Queue::push(Message::shared_ptr& msg){
     queueing = true;
     messages.push(msg);
+    if (policy.get()) policy->enqueued(messages.front(), store);
 }
 
 u_int32_t Queue::getMessageCount() const{
@@ -206,24 +208,17 @@
 
 void Queue::create(const FieldTable& settings)
 {
-    //Note: currently field table only contain signed 32 bit ints, which
-    //      restricts the values that can be set on the queue policy.
-    u_int32_t maxCount(0);
-    try {
-        maxCount = settings.getInt(qpidMaxSize); 
-    } catch (FieldNotFoundException& ignore) {
-    }
-    u_int32_t maxSize(0);
-    try {
-        maxSize = settings.getInt(qpidMaxCount);
-    } catch (FieldNotFoundException& ignore) {
-    }
-    if (maxCount || maxSize) {
-        setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
-    }
- 
     if (store) {
-        store->create(*this);
+        store->create(*this, settings);
+    }
+    configure(settings);
+}
+
+void Queue::configure(const FieldTable& settings)
+{
+    QueuePolicy* _policy = new QueuePolicy(settings);
+    if (_policy->getMaxCount() || _policy->getMaxSize()) {
+        setPolicy(std::auto_ptr<QueuePolicy>(_policy));
     }
 }
 
@@ -237,4 +232,9 @@
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
 {
     policy = _policy;
+}
+
+const QueuePolicy* const Queue::getPolicy()
+{
+    return policy.get();
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Tue Dec  5 09:43:00 2006
@@ -66,7 +66,7 @@
             int64_t lastUsed;
             Consumer* exclusive;
             mutable u_int64_t persistenceId;
-            std::auto_ptr<QueuePolicy> policy;
+            std::auto_ptr<QueuePolicy> policy;            
 
             void pop();
             void push(Message::shared_ptr& msg);
@@ -86,6 +86,7 @@
             ~Queue();
 
             void create(const qpid::framing::FieldTable& settings);
+            void configure(const qpid::framing::FieldTable& settings);
             void destroy();
             /**
              * Informs the queue of a binding that should be cancelled on
@@ -135,6 +136,8 @@
              * dequeues from memory only
              */
             Message::shared_ptr dequeue();
+
+            const QueuePolicy* const getPolicy();
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Tue Dec  5 09:43:00 2006
@@ -22,6 +22,7 @@
 #define _MessageStore_
 
 #include <BrokerMessage.h>
+#include <FieldTable.h>
 #include <RecoveryManager.h>
 #include <TransactionalStore.h>
 
@@ -45,7 +46,7 @@
             /**
              * Record the existance of a durable queue
              */
-            virtual void create(const Queue& queue) = 0;
+            virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
             /**
              * Destroy a durable queue
              */

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Tue Dec  5 09:43:00 2006
@@ -28,9 +28,9 @@
 {
 }
 
-void MessageStoreModule::create(const Queue& queue)
+void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
 {
-    store->create(queue);
+    store->create(queue, settings);
 }
 
 void MessageStoreModule::destroy(const Queue& queue)

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Tue Dec  5 09:43:00 2006
@@ -36,7 +36,7 @@
             qpid::sys::Module<MessageStore> store;
         public:
             MessageStoreModule(const std::string& name);
-            void create(const Queue& queue);
+            void create(const Queue& queue, const qpid::framing::FieldTable& settings);
             void destroy(const Queue& queue);
             void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
             void stage(Message::shared_ptr& msg);

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Tue Dec  5 09:43:00 2006
@@ -30,7 +30,7 @@
 
 NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
 
-void NullMessageStore::create(const Queue& queue)
+void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
 {
     if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Tue Dec  5 09:43:00 2006
@@ -35,7 +35,7 @@
             const bool warn;
         public:
             NullMessageStore(bool warn = true);
-            virtual void create(const Queue& queue);
+            virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
             virtual void destroy(const Queue& queue);
             virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
             virtual void stage(Message::shared_ptr& msg);

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp Tue Dec  5 09:43:00 2006
@@ -21,8 +21,14 @@
 #include <QueuePolicy.h>
 
 using namespace qpid::broker;
+using namespace qpid::framing;
 
-QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : 
+    maxCount(_maxCount), maxSize(_maxSize) {}
+
+QueuePolicy::QueuePolicy(const FieldTable& settings) :
+    maxCount(getInt(settings, maxCountKey, 0)), 
+    maxSize(getInt(settings, maxSizeKey, 0)) {}
 
 void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
 {
@@ -47,3 +53,23 @@
     return maxSize && (size += msg->contentSize()) > maxSize;
 }
 
+void QueuePolicy::update(FieldTable& settings)
+{
+    if (maxCount) settings.setInt(maxCountKey, maxCount);
+    if (maxSize) settings.setInt(maxSizeKey, maxSize);    
+}
+
+
+int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
+{
+    //Note: currently field table only contain signed 32 bit ints, which
+    //      restricts the values that can be set on the queue policy.
+    try {
+        return settings.getInt(key); 
+    } catch (FieldNotFoundException& ignore) {
+        return defaultValue;
+    }
+}
+
+const std::string QueuePolicy::maxCountKey("qpid.max_count");
+const std::string QueuePolicy::maxSizeKey("qpid.max_size");

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h Tue Dec  5 09:43:00 2006
@@ -22,22 +22,31 @@
 #define _QueuePolicy_
 
 #include <BrokerMessage.h>
+#include <FieldTable.h>
 
 namespace qpid {
     namespace broker {
         class QueuePolicy
         {
+            static const std::string maxCountKey;
+            static const std::string maxSizeKey;
+
             const u_int32_t maxCount;
             const u_int64_t maxSize;
             u_int32_t count;
             u_int64_t size;
             
+            static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
             bool checkCount(Message::shared_ptr& msg);
             bool checkSize(Message::shared_ptr& msg);
         public:
             QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+            QueuePolicy(const qpid::framing::FieldTable& settings);
             void enqueued(Message::shared_ptr& msg, MessageStore* store);
             void dequeued(Message::shared_ptr& msg, MessageStore* store);
+            void update(qpid::framing::FieldTable& settings);
+            u_int32_t getMaxCount() const { return maxCount; }
+            u_int64_t getMaxSize() const { return maxSize; }           
         };
     }
 }