You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/05 18:43:03 UTC
svn commit: r482723 - in /incubator/qpid/trunk/qpid/cpp/lib/broker:
BrokerQueue.cpp BrokerQueue.h MessageStore.h MessageStoreModule.cpp
MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h
QueuePolicy.cpp QueuePolicy.h
Author: gsim
Date: Tue Dec 5 09:43:00 2006
New Revision: 482723
URL: http://svn.apache.org/viewvc?view=rev&rev=482723
Log:
Allow settings to be set and persisted for queues.
Define policy based on these settings.
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Tue Dec 5 09:43:00 2006
@@ -161,12 +161,14 @@
}
void Queue::pop(){
- messages.pop();
+ if (policy.get()) policy->dequeued(messages.front(), store);
+ messages.pop();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
messages.push(msg);
+ if (policy.get()) policy->enqueued(messages.front(), store);
}
u_int32_t Queue::getMessageCount() const{
@@ -206,24 +208,17 @@
void Queue::create(const FieldTable& settings)
{
- //Note: currently field table only contain signed 32 bit ints, which
- // restricts the values that can be set on the queue policy.
- u_int32_t maxCount(0);
- try {
- maxCount = settings.getInt(qpidMaxSize);
- } catch (FieldNotFoundException& ignore) {
- }
- u_int32_t maxSize(0);
- try {
- maxSize = settings.getInt(qpidMaxCount);
- } catch (FieldNotFoundException& ignore) {
- }
- if (maxCount || maxSize) {
- setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
- }
-
if (store) {
- store->create(*this);
+ store->create(*this, settings);
+ }
+ configure(settings);
+}
+
+void Queue::configure(const FieldTable& settings)
+{
+ QueuePolicy* _policy = new QueuePolicy(settings);
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
+ setPolicy(std::auto_ptr<QueuePolicy>(_policy));
}
}
@@ -237,4 +232,9 @@
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
policy = _policy;
+}
+
+const QueuePolicy* const Queue::getPolicy()
+{
+ return policy.get();
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Tue Dec 5 09:43:00 2006
@@ -66,7 +66,7 @@
int64_t lastUsed;
Consumer* exclusive;
mutable u_int64_t persistenceId;
- std::auto_ptr<QueuePolicy> policy;
+ std::auto_ptr<QueuePolicy> policy;
void pop();
void push(Message::shared_ptr& msg);
@@ -86,6 +86,7 @@
~Queue();
void create(const qpid::framing::FieldTable& settings);
+ void configure(const qpid::framing::FieldTable& settings);
void destroy();
/**
* Informs the queue of a binding that should be cancelled on
@@ -135,6 +136,8 @@
* dequeues from memory only
*/
Message::shared_ptr dequeue();
+
+ const QueuePolicy* const getPolicy();
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Tue Dec 5 09:43:00 2006
@@ -22,6 +22,7 @@
#define _MessageStore_
#include <BrokerMessage.h>
+#include <FieldTable.h>
#include <RecoveryManager.h>
#include <TransactionalStore.h>
@@ -45,7 +46,7 @@
/**
* Record the existance of a durable queue
*/
- virtual void create(const Queue& queue) = 0;
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
/**
* Destroy a durable queue
*/
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Tue Dec 5 09:43:00 2006
@@ -28,9 +28,9 @@
{
}
-void MessageStoreModule::create(const Queue& queue)
+void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
{
- store->create(queue);
+ store->create(queue, settings);
}
void MessageStoreModule::destroy(const Queue& queue)
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Tue Dec 5 09:43:00 2006
@@ -36,7 +36,7 @@
qpid::sys::Module<MessageStore> store;
public:
MessageStoreModule(const std::string& name);
- void create(const Queue& queue);
+ void create(const Queue& queue, const qpid::framing::FieldTable& settings);
void destroy(const Queue& queue);
void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
void stage(Message::shared_ptr& msg);
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Tue Dec 5 09:43:00 2006
@@ -30,7 +30,7 @@
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-void NullMessageStore::create(const Queue& queue)
+void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
{
if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Tue Dec 5 09:43:00 2006
@@ -35,7 +35,7 @@
const bool warn;
public:
NullMessageStore(bool warn = true);
- virtual void create(const Queue& queue);
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
virtual void stage(Message::shared_ptr& msg);
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp Tue Dec 5 09:43:00 2006
@@ -21,8 +21,14 @@
#include <QueuePolicy.h>
using namespace qpid::broker;
+using namespace qpid::framing;
-QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) :
+ maxCount(_maxCount), maxSize(_maxSize) {}
+
+QueuePolicy::QueuePolicy(const FieldTable& settings) :
+ maxCount(getInt(settings, maxCountKey, 0)),
+ maxSize(getInt(settings, maxSizeKey, 0)) {}
void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
{
@@ -47,3 +53,23 @@
return maxSize && (size += msg->contentSize()) > maxSize;
}
+void QueuePolicy::update(FieldTable& settings)
+{
+ if (maxCount) settings.setInt(maxCountKey, maxCount);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+}
+
+
+int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ try {
+ return settings.getInt(key);
+ } catch (FieldNotFoundException& ignore) {
+ return defaultValue;
+ }
+}
+
+const std::string QueuePolicy::maxCountKey("qpid.max_count");
+const std::string QueuePolicy::maxSizeKey("qpid.max_size");
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h?view=diff&rev=482723&r1=482722&r2=482723
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h Tue Dec 5 09:43:00 2006
@@ -22,22 +22,31 @@
#define _QueuePolicy_
#include <BrokerMessage.h>
+#include <FieldTable.h>
namespace qpid {
namespace broker {
class QueuePolicy
{
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+
const u_int32_t maxCount;
const u_int64_t maxSize;
u_int32_t count;
u_int64_t size;
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
bool checkCount(Message::shared_ptr& msg);
bool checkSize(Message::shared_ptr& msg);
public:
QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+ QueuePolicy(const qpid::framing::FieldTable& settings);
void enqueued(Message::shared_ptr& msg, MessageStore* store);
void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ void update(qpid::framing::FieldTable& settings);
+ u_int32_t getMaxCount() const { return maxCount; }
+ u_int64_t getMaxSize() const { return maxSize; }
};
}
}