You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC
svn commit: r1377715 [5/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/
cpp/src/qpid/asyncStore/ cpp/src/qpid...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Aug 27 15:40:33 2012
@@ -20,7 +20,9 @@
*/
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -57,34 +59,6 @@ namespace {
<< "=" << max));
}
}
-
- /** extract a capacity value as passed in an argument map
- */
- uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
- {
- FieldTable::ValuePtr v = settings.get(key);
-
- int64_t result = 0;
-
- if (!v) return defaultValue;
- if (v->getType() == 0x23) {
- QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
- } else if (v->getType() == 0x33) {
- QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int64_t>()) {
- result = v->get<int64_t>();
- QPID_LOG(debug, "Got integer value for " << key << ": " << result);
- if (result >= 0) return result;
- } else if (v->convertsTo<std::string>()) {
- std::string s(v->get<std::string>());
- QPID_LOG(debug, "Got string value for " << key << ": " << s);
- std::istringstream convert(s);
- if (convert >> result && result >= 0) return result;
- }
-
- QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
- return defaultValue;
- }
}
@@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
if (queue) {
queueName = _queue->getName();
- if (queue->getPolicy()) {
- maxSize = _queue->getPolicy()->getMaxSize();
- maxCount = _queue->getPolicy()->getMaxCount();
- }
+ if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
+ if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
if (queueMgmtObj) {
@@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit()
sys::Mutex::ScopedLock l(indexLock);
if (!index.empty()) {
// we're gone - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
try {
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
} catch (...) {} // ignore - not safe for a destructor to throw.
index.clear();
}
}
-void QueueFlowLimit::enqueued(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.payload->contentSize();
+ size += msg.getContentSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const Queu
if (flowStopped || !index.empty()) {
// ignore flow control if we are populating the queue due to cluster replication:
if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+ QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
return;
}
- QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
- msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
+ QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
+ msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}
@@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const Queu
-void QueueFlowLimit::dequeued(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
@@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const Queu
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.payload->contentSize();
+ uint64_t _size = msg.getContentSize();
if (_size <= size) {
size -= _size;
} else {
@@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const Queu
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+ std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence());
if (itr != index.end()) { // this msg is flow controlled, release it:
- msg.payload->getIngressCompletion().finishCompleter();
+ msg.getPersistentContext()->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_
}
-void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
{
QueueFlowLimit *ptr = createLimit( &queue, settings );
if (ptr) {
@@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queu
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
{
- std::string type(QueuePolicy::getType(settings));
-
- if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+ if (settings.dropMessagesAtLimit) {
// The size of a RING queue is limited by design - no need for flow control.
return 0;
}
- if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
- settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+ if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
// user provided (some) flow settings manually...
- uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
- uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
- uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
- uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
- if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
+ if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
+ return new QueueFlowLimit(queue,
+ settings.flowStop.getCount(),
+ settings.flowResume.getCount(),
+ settings.flowStop.getSize(),
+ settings.flowResume.getSize());
+ } else {
+ //don't have a non-zero value for either the count or the
+ //size to stop at, yet at least one of these settings was
+ //provided, i.e it was set to 0 explicitly which we treat
+ //as turning it off
return 0;
}
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
if (defaultFlowStopRatio) { // broker has a default ratio setup...
- uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+ uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default
+ uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
return 0;
@@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::fram
framing::SequenceSet ss;
if (!index.empty()) {
/* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
itr != index.end(); ++itr) {
ss.add(itr->first);
}
@@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg;
+ Message msg;
queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.h Mon Aug 27 15:40:33 2012
@@ -26,9 +26,9 @@
#include <iostream>
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -45,6 +45,8 @@ namespace qpid {
namespace broker {
class Broker;
+class Queue;
+struct QueueSettings;
/**
* Producer flow control: when level is > flowStop*, flow control is ON.
@@ -80,13 +82,13 @@ class Broker;
QPID_BROKER_EXTERN virtual ~QueueFlowLimit();
- /** the queue has added QueuedMessage. Returns true if flow state changes */
- QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
- /** the queue has removed QueuedMessage. Returns true if flow state changes */
- QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+ /** the queue has added QueuedMessage */
+ QPID_BROKER_EXTERN void enqueued(const Message&);
+ /** the queue has removed QueuedMessage */
+ QPID_BROKER_EXTERN void dequeued(const Message&);
/** ignored */
- QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
- QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
+ QPID_BROKER_EXTERN void acquired(const Message&) {};
+ QPID_BROKER_EXTERN void requeued(const Message&) {};
/** for clustering: */
QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
@@ -106,14 +108,14 @@ class Broker;
void decode(framing::Buffer& buffer);
uint32_t encodedSize() const;
- static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
protected:
// msgs waiting for flow to become available.
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
+ std::map<framing::SequenceNumber, Message > index;
mutable qpid::sys::Mutex indexLock;
_qmfBroker::Queue *queueMgmtObj;
@@ -123,7 +125,7 @@ class Broker;
QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize);
- static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueObserver.h Mon Aug 27 15:40:33 2012
@@ -24,8 +24,8 @@
namespace qpid {
namespace broker {
-struct QueuedMessage;
class Consumer;
+class Message;
/**
* Interface for notifying classes who want to act as 'observers' of a queue of particular
@@ -63,10 +63,10 @@ class QueueObserver
virtual ~QueueObserver() {}
// note: the Queue will hold the messageLock while calling these methods!
- virtual void enqueued(const QueuedMessage&) = 0;
- virtual void dequeued(const QueuedMessage&) = 0;
- virtual void acquired(const QueuedMessage&) = 0;
- virtual void requeued(const QueuedMessage&) = 0;
+ virtual void enqueued(const Message&) = 0;
+ virtual void dequeued(const Message&) = 0;
+ virtual void acquired(const Message&) = 0;
+ virtual void requeued(const Message&) = 0;
virtual void consumerAdded( const Consumer& ) {};
virtual void consumerRemoved( const Consumer& ) {};
private:
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp Mon Aug 27 15:40:33 2012
@@ -21,7 +21,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -32,50 +31,43 @@ using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
-QueueRegistry::QueueRegistry(Broker* b) :
- counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
+QueueRegistry::QueueRegistry(Broker* b)
+{
+ setBroker(b);
+}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const OwnershipToken* owner,
+QueueRegistry::declare(const string& name, const QueueSettings& settings,
boost::shared_ptr<Exchange> alternate,
- const qpid::framing::FieldTable& arguments,
bool recovering/*true if this declare is a
result of recovering queue
- definition from persistente
+ definition from persistent
record*/)
{
- Queue::shared_ptr queue;
std::pair<Queue::shared_ptr, bool> result;
{
RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
QueueMap::iterator i = queues.find(name);
-
if (i == queues.end()) {
- queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ Queue::shared_ptr queue = create(name, settings);
+ //Move this to factory also?
if (alternate) {
queue->setAlternateExchange(alternate);//need to do this *before* create
alternate->incAlternateUsers();
}
if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
- } else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ //create persistent record if required
+ queue->create();
}
queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
}
- if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
@@ -85,11 +77,11 @@ void QueueRegistry::destroy(const string
qpid::sys::RWlock::ScopedWlock locker(lock);
QueueMap::iterator i = queues.find(name);
if (i != queues.end()) {
- Queue::shared_ptr q = i->second;
+ q = i->second;
queues.erase(i);
}
}
- if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
+ if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
@@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(con
return q;
}
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
-
void QueueRegistry::setStore (MessageStore* _store)
{
- store = _store;
+ QueueFactory::setStore(_store);
}
-MessageStore* QueueRegistry::getStore() const {
- return store;
+MessageStore* QueueRegistry::getStore() const
+{
+ return QueueFactory::getStore();
}
-void QueueRegistry::updateQueueClusterState(bool _lastNode)
+void QueueRegistry::setParent(qpid::management::Manageable* _parent)
{
- RWlock::ScopedRlock locker(lock);
- for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
- if (_lastNode){
- i->second->setLastNodeFailure();
- } else {
- i->second->clearLastNodeFailure();
- }
- }
- lastNode = _lastNode;
+ QueueFactory::setParent(_parent);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,8 @@
#define _QueueRegistry_
#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueFactory.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/management/Manageable.h"
#include "qpid/framing/FieldTable.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -34,11 +34,8 @@ namespace qpid {
namespace broker {
class Queue;
-class QueueEvents;
class Exchange;
class OwnershipToken;
-class Broker;
-class MessageStore;
/**
* A registry of queues indexed by queue name.
@@ -47,7 +44,7 @@ class MessageStore;
* are deleted when and only when they are no longer in use.
*
*/
-class QueueRegistry {
+class QueueRegistry : QueueFactory {
public:
QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
QPID_BROKER_EXTERN ~QueueRegistry();
@@ -60,11 +57,8 @@ class QueueRegistry {
*/
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
const std::string& name,
- bool durable = false,
- bool autodelete = false,
- const OwnershipToken* owner = 0,
+ const QueueSettings& settings,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
- const qpid::framing::FieldTable& args = framing::FieldTable(),
bool recovering = false);
/**
@@ -101,11 +95,6 @@ class QueueRegistry {
QPID_BROKER_EXTERN boost::shared_ptr<Queue> get(const std::string& name);
/**
- * Generate unique queue name.
- */
- std::string generateName();
-
- /**
* Set the store to use. May only be called once.
*/
void setStore (MessageStore*);
@@ -118,7 +107,7 @@ class QueueRegistry {
/**
* Register the manageable parent for declared queues
*/
- void setParent (management::Manageable* _parent) { parent = _parent; }
+ void setParent (management::Manageable*);
/** Call f for each queue in the registry. */
template <class F> void eachQueue(F f) const {
@@ -127,22 +116,10 @@ class QueueRegistry {
f(i->second);
}
- /**
- * Change queue mode when cluster size drops to 1 node, expands again
- * in practice allows flow queue to disk when last name to be exectuted
- */
- void updateQueueClusterState(bool lastNode);
-
private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
mutable qpid::sys::RWlock lock;
- int counter;
- MessageStore* store;
- QueueEvents* events;
- management::Manageable* parent;
- bool lastNode; //used to set mode on queue declare
- Broker* broker;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueuedMessage.h Mon Aug 27 15:40:33 2012
@@ -22,8 +22,8 @@
#define _QueuedMessage_
#include "qpid/broker/Message.h"
-#include "BrokerImportExport.h"
-#include <iosfwd>
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/broker/BrokerImportExport.h"
namespace qpid {
namespace broker {
@@ -32,20 +32,19 @@ class Queue;
struct QueuedMessage
{
- boost::intrusive_ptr<Message> payload;
+ Message message;
framing::SequenceNumber position;
- typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
- Status status;
+ enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
Queue* queue;
- QueuedMessage(Queue* q=0,
- boost::intrusive_ptr<Message> msg=0,
- framing::SequenceNumber sn=0,
- Status st=AVAILABLE
- ) : payload(msg), position(sn), status(st), queue(q) {}
+ QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q, Message msg, framing::SequenceNumber sn) :
+ message(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
};
-inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
+{
return a.position < b.position;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.cpp Mon Aug 27 15:40:33 2012
@@ -22,10 +22,9 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredDequeue.h"
-using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
{
queue->recoverPrepared(msg);
}
@@ -38,11 +37,11 @@ bool RecoveredDequeue::prepare(Transacti
void RecoveredDequeue::commit() throw()
{
- queue->enqueueAborted(msg);
+ queue->dequeueCommited(msg);
}
void RecoveredDequeue::rollback() throw()
{
- queue->process(msg);
+ queue->dequeueAborted(msg);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredDequeue.h Mon Aug 27 15:40:33 2012
@@ -26,8 +26,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
-#include <boost/intrusive_ptr.hpp>
-
#include <algorithm>
#include <functional>
#include <list>
@@ -36,18 +34,17 @@ namespace qpid {
namespace broker {
class RecoveredDequeue : public TxOp{
boost::shared_ptr<Queue> queue;
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+ RecoveredDequeue(boost::shared_ptr<Queue> queue, Message msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
boost::shared_ptr<Queue> getQueue() const { return queue; }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ Message getMessage() const { return msg; }
};
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.cpp Mon Aug 27 15:40:33 2012
@@ -22,10 +22,9 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredEnqueue.h"
-using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
{
queue->recoverPrepared(msg);
}
@@ -36,7 +35,7 @@ bool RecoveredEnqueue::prepare(Transacti
}
void RecoveredEnqueue::commit() throw(){
- queue->process(msg);
+ queue->enqueueCommited(msg);
}
void RecoveredEnqueue::rollback() throw(){
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveredEnqueue.h Mon Aug 27 15:40:33 2012
@@ -26,8 +26,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
-#include <boost/intrusive_ptr.hpp>
-
#include <algorithm>
#include <functional>
#include <list>
@@ -36,19 +34,17 @@ namespace qpid {
namespace broker {
class RecoveredEnqueue : public TxOp{
boost::shared_ptr<Queue> queue;
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+ RecoveredEnqueue(boost::shared_ptr<Queue> queue, Message msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredEnqueue(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
boost::shared_ptr<Queue> getQueue() const { return queue; }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
-
+ Message getMessage() const { return msg; }
};
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Aug 27 15:40:33 2012
@@ -21,11 +21,13 @@
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
using boost::dynamic_pointer_cast;
@@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImp
class RecoverableMessageImpl : public RecoverableMessage
{
- intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
+ RecoverableMessageImpl(const Message& _msg);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
@@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryMan
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- boost::intrusive_ptr<Message> message(new Message());
- message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
+ //TODO: determine encoding/version actually used
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
@@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComple
exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
- if (!msg->isPersistent()) {
- msg->forcePersistent(); // set so that message will get dequeued from store.
- }
-}
+RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {}
bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
{
@@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent
void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
{
- msg->decodeContent(buffer);
+ msg.getPersistentContext()->decodeContent(buffer);
}
void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
@@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Que
void RecoverableMessageImpl::setPersistenceId(uint64_t id)
{
- msg->setPersistenceId(id);
+ msg.getPersistentContext()->setPersistenceId(id);
}
void RecoverableMessageImpl::setRedelivered()
{
- msg->redeliver();
+ msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
@@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenc
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp Mon Aug 27 15:40:33 2012
@@ -29,7 +29,7 @@
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SequenceSet.h"
@@ -65,9 +65,8 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
+SemanticState::SemanticState(SessionState& ss)
: session(ss),
- deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
@@ -89,7 +88,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
- recover(true);
+ requeue();
//now unsubscribe, which may trigger queue deletion and thus
//needs to occur after the requeueing of unacked messages
@@ -124,7 +123,7 @@ void SemanticState::consume(const string
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
c = ConsumerImpl::shared_ptr(
- new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
@@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _acquire,
+ SubscriptionType type,
bool _exclusive,
const string& _tag,
const string& _resumeId,
@@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImp
const framing::FieldTable& _arguments
) :
- Consumer(_name, _acquire),
+Consumer(_name, type),
parent(_parent),
queue(_queue),
ackExpected(ack),
- acquire(_acquire),
+ acquire(type == CONSUMER),
blocked(true),
exclusive(_exclusive),
resumeId(_resumeId),
@@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerI
return &(parent->session);
}
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+{
+ return deliver(cursor, msg, shared_from_this());
+}
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
assertClusterSafe();
- allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
- shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
+ allocateCredit(msg);
+ DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+ consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- parent->deliver(record, sync);
+ const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
+
+ record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+ ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
+ acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
+ msg.getAnnotations(),
+ sync));
if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- msg.queue->dequeue(0, msg);
+ queue->dequeue(0 /*ctxt*/, cursor);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
+bool SemanticState::ConsumerImpl::filter(const Message&)
{
return true;
}
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::accept(const Message& msg)
{
assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
@@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const Co
}
}
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
assertClusterSafe();
Credit original = credit;
- credit.consume(1, msg->getRequiredCredit());
+ credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
}
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
{
- bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+ bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
- << " credit for message of " << msg->getRequiredCredit() << " bytes: "
+ << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
<< credit);
return enoughCredit;
}
@@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl
session.getConnection().outputTasks.removeOutputTask(c.get());
}
-
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
disable(c);
@@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl:
c->cancel();
}
-void SemanticState::handle(intrusive_ptr<Message> msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- if (msg->isContentReleaseRequested()) {
- // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
- // presence of these messages). Do not change these without also checking these tests.
- if (msg->isContentReleaseBlocked()) {
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
- } else {
- msg->releaseContent();
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released");
- }
- }
- }
-}
-
-namespace
+TxBuffer* SemanticState::getTxBuffer()
{
-const std::string nullstring;
+ return txBuffer.get();
}
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+void SemanticState::route(Message& msg, Deliverable& strategy) {
+ msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
- std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName
- || cacheExchange->isDestroyed())
- {
+ std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
- cacheExchange->setProperties(msg);
/* verify the userid if specified: */
- std::string id =
- msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+ std::string id = msg.getUserId();
if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<
AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
- if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
+ if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() ))
throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
- exchangeName << " with routing-key " << msg->getRoutingKey()));
+ exchangeName << " with routing-key " << msg.getRoutingKey()));
}
cacheExchange->route(strategy);
@@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy);
}
- if (!strategy.delivered) {
- msg->destroy();
- }
}
}
@@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::comple
}
}
-void SemanticState::recover(bool requeue)
+void SemanticState::requeue()
{
- if(requeue){
- //take copy and clear unacked as requeue may result in redelivery to this session
- //which will in turn result in additions to unacked
- DeliveryRecords copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
- //unconfirmed messages re redelivered and therefore have their
- //id adjusted, confirmed messages are not and so the ordering
- //w.r.t id is lost
- sort(unacked.begin(), unacked.end());
- }
+ //take copy and clear unacked as requeue may result in redelivery to this session
+ //which will in turn result in additions to unacked
+ DeliveryRecords copy = unacked;
+ unacked.clear();
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
getSession().setUnackedCount(unacked.size());
}
-void SemanticState::deliver(DeliveryRecord& msg, bool sync)
-{
- return deliveryAdapter.deliver(msg, sync);
-}
+
+SessionContext& SemanticState::getSession() { return session; }
+const SessionContext& SemanticState::getSession() const { return session; }
+
const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h Mon Aug 27 15:40:33 2012
@@ -26,7 +26,6 @@
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Credit.h"
#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
@@ -34,12 +33,15 @@
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/TxBuffer.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
@@ -47,13 +49,15 @@
#include <vector>
#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
namespace qpid {
namespace broker {
+class Exchange;
+class MessageStore;
class SessionContext;
+class SessionState;
/**
*
@@ -94,28 +98,28 @@ class SemanticState : private boost::non
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription* mgmtObject;
- bool checkCredit(boost::intrusive_ptr<Message>& msg);
- void allocateCredit(boost::intrusive_ptr<Message>& msg);
+ bool checkCredit(const Message& msg);
+ void allocateCredit(const Message& msg);
bool haveCredit();
protected:
QPID_BROKER_EXTERN virtual bool doDispatch();
size_t unacked() { return parent->unacked.size(); }
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- QPID_BROKER_EXTERN ConsumerImpl(
- SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
- QPID_BROKER_EXTERN virtual ~ConsumerImpl();
+ QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, SubscriptionType type, bool exclusive,
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
+ QPID_BROKER_EXTERN ~ConsumerImpl();
QPID_BROKER_EXTERN OwnershipToken* getSession();
- QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg);
- QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+ QPID_BROKER_EXTERN bool filter(const Message&);
+ QPID_BROKER_EXTERN bool accept(const Message&);
QPID_BROKER_EXTERN void cancel() {}
QPID_BROKER_EXTERN void disableNotify();
@@ -153,7 +157,7 @@ class SemanticState : private boost::non
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
- void acknowledged(const broker::QueuedMessage&) {}
+ void acknowledged(const DeliveryRecord&) {}
// manageable entry points
QPID_BROKER_EXTERN management::ManagementObject*
@@ -168,8 +172,7 @@ class SemanticState : private boost::non
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
- SessionContext& session;
- DeliveryAdapter& deliveryAdapter;
+ SessionState& session;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
DeliveryRecords unacked;
@@ -185,7 +188,6 @@ class SemanticState : private boost::non
//needed for queue delete events in auto-delete:
const std::string connectionId;
- void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -196,11 +198,11 @@ class SemanticState : private boost::non
public:
- SemanticState(DeliveryAdapter&, SessionContext&);
+ SemanticState(SessionState&);
~SemanticState();
- SessionContext& getSession() { return session; }
- const SessionContext& getSession() const { return session; }
+ SessionContext& getSession();
+ const SessionContext& getSession() const;
const ConsumerImpl::shared_ptr find(const std::string& destination) const;
bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
@@ -239,12 +241,12 @@ class SemanticState : private boost::non
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void recover(bool requeue);
- void deliver(DeliveryRecord& message, bool sync);
+ TxBuffer* getTxBuffer();
+ void requeue();
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
- void handle(boost::intrusive_ptr<Message> msg);
+ void route(Message& msg, Deliverable& strategy);
void completed(const framing::SequenceSet& commands);
void accepted(const framing::SequenceSet& commands);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp Mon Aug 27 15:40:33 2012
@@ -264,7 +264,7 @@ QueueQueryResult SessionAdapter::QueueHa
queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
- queue->getSettings(),
+ queue->getEncodableSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
} else {
@@ -294,19 +294,24 @@ void SessionAdapter::QueueHandlerImpl::d
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
+ QueueSettings settings(durable, autoDelete);
+ try {
+ settings.populate(arguments, settings.storeSettings);
+ } catch (const qpid::types::Exception& e) {
+ throw InvalidArgumentException(e.what());
+ }
+
std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().createQueue(name, durable,
- autoDelete,
+ getBroker().createQueue(name, settings,
exclusive ? &session : 0,
alternateExchange,
- arguments,
getConnection().getUserId(),
getConnection().getUrl());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
//handle automatic cleanup:
- if (exclusive) {
+ if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
} else {
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+ : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out),
clusterOrderProxy(c.getClusterOrderOutput() ?
@@ -75,7 +75,7 @@ ConnectionState& SessionHandler::getConn
const ConnectionState& SessionHandler::getConnection() const { return connection; }
void SessionHandler::handleDetach() {
- amqp_0_10::SessionHandler::handleDetach();
+ qpid::amqp_0_10::SessionHandler::handleDetach();
assert(&connection.getChannel(channel.get()) == this);
if (session.get())
connection.getBroker().getSessionManager().detach(session);
@@ -125,7 +125,7 @@ void SessionHandler::attached(const std:
{
if (session.get()) {
session->addManagementObject(); // Delayed from attachAs()
- amqp_0_10::SessionHandler::attached(name);
+ qpid::amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h Mon Aug 27 15:40:33 2012
@@ -41,7 +41,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public amqp_0_10::SessionHandler {
+class SessionHandler : public qpid::amqp_0_10::SessionHandler {
public:
class ErrorListener {
public:
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp Mon Aug 27 15:40:33 2012
@@ -21,6 +21,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
@@ -28,6 +29,7 @@
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -55,9 +57,8 @@ SessionState::SessionState(
const SessionState::Configuration& config, bool delayManagement)
: qpid::SessionState(id, config),
broker(b), handler(&h),
- semanticState(*this, *this),
+ semanticState(*this),
adapter(semanticState),
- msgBuilder(&broker.getStore()),
mgmtObject(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
@@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFram
{
if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFram
header.setEof(false);
msg->getFrames().append(header);
}
+ DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
- msg->setTimestamp();
- msg->setPublisher(&getConnection());
+ deliverable.getMessage().setTimestamp();
+ deliverable.getMessage().setPublisher(&getConnection());
+
+
+ IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().begin();
- semanticState.handle(msg);
+ semanticState.route(deliverable.getMessage(), deliverable);
msgBuilder.end();
- IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
}
@@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& f
handler->out(frame);
}
-void SessionState::deliver(DeliveryRecord& msg, bool sync)
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
- msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
+
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+ method.setEof(false);
+ getProxy().getHandler().handle(method);
+ message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+ message.sendContent(getProxy().getHandler(), maxFrameSize);
+
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
if (sync) {
AMQP_ClientProxy::Execution& p(getProxy().getExecution());
Proxy::ScopedSync s(p);
p.sync();
}
+ return commandId;
}
void SessionState::sendCompletion() {
@@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSy
}
}
-
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
@@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter
/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+ std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
bool unique = pendingMsgs.insert(item).second;
if (!unique) {
assert(false);
@@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter
/** done when an execution.sync arrives */
void SessionState::AsyncCommandCompleter::flushPendingMessages()
{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
}
// drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h Mon Aug 27 15:40:33 2012
@@ -23,17 +23,18 @@
*/
#include "qpid/SessionState.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
-#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
@@ -58,7 +59,6 @@ namespace broker {
class Broker;
class ConnectionState;
-class Message;
class SessionHandler;
class SessionManager;
@@ -68,7 +68,6 @@ class SessionManager;
*/
class SessionState : public qpid::SessionState,
public SessionContext,
- public DeliveryAdapter,
public management::Manageable,
public framing::FrameHandler::InOutHandler
{
@@ -105,8 +104,10 @@ class SessionState : public qpid::Sessio
void sendCompletion();
- //delivery adapter methods:
- void deliver(DeliveryRecord&, bool sync);
+ DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
@@ -117,7 +118,7 @@ class SessionState : public qpid::Sessio
// Used by cluster to create replica sessions.
SemanticState& getSemanticState() { return semanticState; }
- boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessageInProgress() { return msgBuilder.getMessage(); }
SessionAdapter& getSessionAdapter() { return adapter; }
const SessionId& getSessionId() const { return getId(); }
@@ -199,7 +200,7 @@ class SessionState : public qpid::Sessio
// If an ingress message does not require a Sync, we need to
// hold a reference to it in case an Execution.Sync command is received and we
// have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > pendingMsgs;
/** complete all pending commands, runs in IO thread */
void completeCommands();
@@ -212,7 +213,7 @@ class SessionState : public qpid::Sessio
~AsyncCommandCompleter() {};
/** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
+ void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
void deletePendingMessage(SequenceNumber id);
void flushPendingMessages();
/** schedule the processing of a completed ingress message.transfer command */
@@ -246,29 +247,29 @@ class SessionState : public qpid::Sessio
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
- boost::intrusive_ptr<Message> m )
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
: AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
+ session(ss),
+ msg(m),
+ requiresAccept(m->requiresAccept()),
+ requiresSync(m->getFrames().getMethod()->isSync()),
+ pending(false) {}
IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
: AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
+ session(x.session),
+ msg(x.msg),
+ requiresAccept(x.requiresAccept),
+ requiresSync(x.requiresSync),
+ pending(x.pending) {}
- virtual ~IncompleteIngressMsgXfer() {};
+ virtual ~IncompleteIngressMsgXfer() {};
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
SessionState *session; // only valid if sync flag in callback is true
- boost::intrusive_ptr<Message> msg;
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg;
bool requiresAccept;
bool requiresSync;
bool pending; // true if msg saved on pending list...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.cpp Mon Aug 27 15:40:33 2012
@@ -28,6 +28,17 @@
namespace qpid {
namespace broker {
+SimpleMessage::SimpleMessage() {}
+
+SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize,
+ boost::intrusive_ptr<PersistableMessage> persistentContext) :
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_persistentContext(persistentContext)
+{}
+
+
+/*
SimpleMessage::SimpleMessage(const char* msgData,
const uint32_t msgSize) :
m_persistenceId(0ULL),
@@ -44,24 +55,28 @@ SimpleMessage::SimpleMessage(const char*
m_store(store),
m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle())
{}
+*/
SimpleMessage::~SimpleMessage() {}
+/*
const MessageHandle&
SimpleMessage::getHandle() const {
- return m_msgHandle;
+ return m_persistentContext.getHandle();
}
MessageHandle&
SimpleMessage::getHandle() {
- return m_msgHandle;
+ return m_persistentContext.getHandle();
}
+*/
uint64_t
SimpleMessage::contentSize() const {
return static_cast<uint64_t>(m_msg.size());
}
+/*
void
SimpleMessage::setPersistenceId(uint64_t id) const {
m_persistenceId = id;
@@ -89,12 +104,18 @@ uint32_t
SimpleMessage::encodedHeaderSize() const {
return 0;
}
-
+*/
bool
SimpleMessage::isPersistent() const {
- return m_store != 0;
+ return m_persistentContext.get() != 0;
}
+boost::intrusive_ptr<PersistableMessage>
+SimpleMessage::getPersistentContext() const {
+ return m_persistentContext;
+}
+
+
uint64_t
SimpleMessage::getSize() {
return m_msg.size();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleMessage.h Mon Aug 27 15:40:33 2012
@@ -25,47 +25,51 @@
#define qpid_broker_SimpleMessage_h_
#include "AsyncStore.h" // DataSource
-#include "MessageHandle.h"
+//#include "MessageHandle.h"
#include "PersistableMessage.h"
namespace qpid {
namespace broker {
-class SimpleMessage: public PersistableMessage,
- public DataSource
+class SimpleMessage: /*public PersistableMessage,*/
+ public DataSource,
+ public RefCounted
{
public:
- SimpleMessage(const char* msgData,
- const uint32_t msgSize);
+ SimpleMessage();
SimpleMessage(const char* msgData,
const uint32_t msgSize,
- AsyncStore* store);
+ boost::intrusive_ptr<PersistableMessage> persistentContext);
virtual ~SimpleMessage();
- const MessageHandle& getHandle() const;
- MessageHandle& getHandle();
+// const MessageHandle& getHandle() const;
+// MessageHandle& getHandle();
uint64_t contentSize() const;
- // --- Interface Persistable ---
- virtual void setPersistenceId(uint64_t id) const;
- virtual uint64_t getPersistenceId() const;
- virtual void encode(qpid::framing::Buffer& buffer) const;
- virtual uint32_t encodedSize() const;
-
- // --- Interface PersistableMessage ---
- virtual void allDequeuesComplete();
- virtual uint32_t encodedHeaderSize() const;
- virtual bool isPersistent() const;
+// // --- Interface Persistable ---
+// virtual void setPersistenceId(uint64_t id) const;
+// virtual uint64_t getPersistenceId() const;
+// virtual void encode(qpid::framing::Buffer& buffer) const;
+// virtual uint32_t encodedSize() const;
+//
+// // --- Interface PersistableMessage ---
+// virtual void allDequeuesComplete();
+// virtual uint32_t encodedHeaderSize() const;
+
+ // Persistent operations
+ bool isPersistent() const;
+ boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
// --- Interface DataSource ---
virtual uint64_t getSize(); // <- same as encodedSize()?
virtual void write(char* target);
private:
- mutable uint64_t m_persistenceId;
+// mutable uint64_t m_persistenceId;
const std::string m_msg;
- AsyncStore* m_store;
+ boost::intrusive_ptr<PersistableMessage> m_persistentContext;
+// AsyncStore* m_store;
- MessageHandle m_msgHandle;
+// MessageHandle m_msgHandle;
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueue.cpp Mon Aug 27 15:40:33 2012
@@ -171,7 +171,7 @@ SimpleQueue::enqueue(SimpleTxnBuffer* tb
return false;
}
if (qm->payload()->isPersistent() && m_store) {
- qm->payload()->enqueueAsync(shared_from_this(), m_store);
+ qm->payload()->getPersistentContext()->enqueueAsync(shared_from_this(), m_store);
return asyncEnqueue(tb, qm);
}
return false;
@@ -190,7 +190,7 @@ SimpleQueue::dequeue(SimpleTxnBuffer* tb
return false;
}
if (qm->payload()->isPersistent() && m_store) {
- qm->payload()->dequeueAsync(shared_from_this(), m_store);
+ qm->payload()->getPersistentContext()->dequeueAsync(shared_from_this(), m_store);
return asyncDequeue(tb, qm);
}
return true;
@@ -316,7 +316,7 @@ SimpleQueue::asyncEnqueue(SimpleTxnBuffe
boost::shared_ptr<SimpleQueuedMessage> qm) {
assert(qm.get());
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm->payload(),
+ /*qm->payload(),*/
tb,
&handleAsyncEnqueueResult,
&m_resultQueue));
@@ -353,7 +353,7 @@ SimpleQueue::asyncDequeue(SimpleTxnBuffe
boost::shared_ptr<SimpleQueuedMessage> qm) {
assert(qm.get());
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm->payload(),
+ /*qm->payload(),*/
tb,
&handleAsyncDequeueResult,
&m_resultQueue));
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SimpleQueuedMessage.cpp Mon Aug 27 15:40:33 2012
@@ -40,7 +40,7 @@ SimpleQueuedMessage::SimpleQueuedMessage
m_msg(msg)
{
if (m_queue->getStore()) {
- m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
+ m_enqHandle = q->getStore()->createEnqueueHandle(msg->getPersistentContext()->getMessageHandle(), q->getHandle());
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.cpp Mon Aug 27 15:40:33 2012
@@ -20,7 +20,8 @@
*/
#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
@@ -30,21 +31,20 @@ namespace qpid {
namespace broker {
namespace {
const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
+bool isQMFv2(const Message& message)
{
- const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+ const qpid::framing::MessageProperties* props = qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<qpid::framing::MessageProperties>();
return props && props->getAppId() == "qmf2";
}
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+bool isThresholdEvent(const Message& message)
{
- if (message->getIsManagementMessage()) {
+ if (message.getIsManagementMessage()) {
//is this a qmf event? if so is it a threshold event?
if (isQMFv2(message)) {
- const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
- if (headers && headers->getAsString("qmf.content") == "_event") {
+ if (message.getPropertyAsString("qmf.content") == "_event") {
//decode as list
- std::string content = message->getFrames().getContent();
+ std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
qpid::types::Variant::List list;
qpid::amqp_0_10::ListCodec::decode(content, list);
if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
@@ -57,7 +57,7 @@ bool isThresholdEvent(const boost::intru
}
}
} else {
- std::string content = message->getFrames().getContent();
+ std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
buffer.getLong();//sequence
@@ -83,9 +83,9 @@ ThresholdAlerts::ThresholdAlerts(const s
repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
-void ThresholdAlerts::enqueued(const QueuedMessage& m)
+void ThresholdAlerts::enqueued(const Message& m)
{
- size += m.payload->contentSize();
+ size += m.getContentSize();
++count;
if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
@@ -94,7 +94,7 @@ void ThresholdAlerts::enqueued(const Que
//enqueued on queues; it may even be that this event
//causes a message to be enqueued on the queue we are
//tracking, and so we need to avoid recursing
- if (isThresholdEvent(m.payload)) return;
+ if (isThresholdEvent(m)) return;
lastAlert = qpid::sys::now();
agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
@@ -102,9 +102,9 @@ void ThresholdAlerts::enqueued(const Que
}
}
-void ThresholdAlerts::dequeued(const QueuedMessage& m)
+void ThresholdAlerts::dequeued(const Message& m)
{
- size -= m.payload->contentSize();
+ size -= m.getContentSize();
--count;
if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
lastAlert = qpid::sys::EPOCH;
@@ -127,65 +127,14 @@ void ThresholdAlerts::observe(Queue& que
}
void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio)
-
-{
- qpid::types::Variant::Map map;
- qpid::amqp_0_10::translate(settings, map);
- observe(queue, agent, map, limitRatio);
-}
-
-template <class T>
-class Option
+ const QueueSettings& settings, uint16_t limitRatio)
{
- public:
- Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
- void addAlias(const std::string& name) { names.push_back(name); }
- T get(const qpid::types::Variant::Map& settings) const
- {
- T value(defaultValue);
- for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
- if (get(settings, *i, value)) break;
- }
- return value;
- }
- private:
- std::vector<std::string> names;
- T defaultValue;
-
- bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
- {
- qpid::types::Variant::Map::const_iterator i = settings.find(name);
- if (i != settings.end()) {
- try {
- value = (T) i->second;
- } catch (const qpid::types::InvalidConversion&) {
- QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
- }
- return true;
- } else {
- return false;
- }
- }
-};
-
-void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio)
-
-{
- //Note: aliases are keys defined by java broker
- Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
- repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
-
//If no explicit threshold settings were given use specified
//percentage of any limit from the policy.
- const QueuePolicy* policy = queue.getPolicy();
- Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
- countThreshold.addAlias("x-qpid-maximum-message-count");
- Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
- sizeThreshold.addAlias("x-qpid-maximum-message-size");
+ uint32_t countThreshold = settings.alertThreshold.hasCount() ? settings.alertThreshold.getCount() : (settings.maxDepth.getCount()*limitRatio/100);
+ uint32_t sizeThreshold = settings.alertThreshold.hasSize() ? settings.alertThreshold.getSize() : (settings.maxDepth.getSize()*limitRatio/100);
- observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+ observe(queue, agent, countThreshold, sizeThreshold, settings.alertRepeatInterval);
}
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ThresholdAlerts.h Mon Aug 27 15:40:33 2012
@@ -27,15 +27,13 @@
#include <string>
namespace qpid {
-namespace framing {
-class FieldTable;
-}
namespace management {
class ManagementAgent;
}
namespace broker {
class Queue;
+struct QueueSettings;
/**
* Class to manage generation of QMF alerts when particular thresholds
* are breached on a queue.
@@ -48,19 +46,17 @@ class ThresholdAlerts : public QueueObse
const uint32_t countThreshold,
const uint64_t sizeThreshold,
const long repeatInterval);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {};
- void requeued(const QueuedMessage&) {};
+ void enqueued(const Message&);
+ void dequeued(const Message&);
+ void acquired(const Message&) {};
+ void requeued(const Message&) {};
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
const long repeatInterval);
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio);
- static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+ const QueueSettings& settings, uint16_t limitRatio);
private:
const std::string name;
qpid::management::ManagementAgent& agent;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxAccept.h Mon Aug 27 15:40:33 2012
@@ -71,7 +71,6 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAccept(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
// Used by cluster replication.
const framing::SequenceSet& getAcked() const { return acked; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.cpp Mon Aug 27 15:40:33 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ for(op_iterator i = ops.begin(); i != ops.end(); i++){
if(!(*i)->prepare(ctxt)){
return false;
}
@@ -74,7 +74,3 @@ bool TxBuffer::commitLocal(Transactional
}
return false;
}
-
-void TxBuffer::accept(TxOpConstVisitor& v) const {
- std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org