You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC
svn commit: r1371676 [5/8] - in /qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/
cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/
cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueDepth.h"
+
+namespace qpid {
+namespace broker {
+
+QueueDepth::QueueDepth() {}
+QueueDepth::QueueDepth(uint32_t c, uint64_t s) : count(c), size(s) {}
+QueueDepth& QueueDepth::operator+=(const QueueDepth& other)
+{
+ if (count.valid) count.value += other.count.value;
+ if (size.valid) size.value += other.size.value;
+ return *this;
+}
+QueueDepth& QueueDepth::operator-=(const QueueDepth& other)
+{
+ if (count.valid) count.value -= other.count.value;
+ if (size.valid) size.value -= other.size.value;
+ return *this;
+}
+bool QueueDepth::operator==(const QueueDepth& other) const
+{
+ //only compare values, not validity an invalid value is always 0;
+ //this means that an invalid value will match an empty queue
+ //depth, which is fine
+ return (count.value == other.count.value)
+ && (size.value == other.size.value);
+}
+bool QueueDepth::operator!=(const QueueDepth& other) const
+{
+ return !(*this == other);
+}
+bool QueueDepth::operator<(const QueueDepth& other) const
+{
+ if (count.valid && size.valid)
+ return count.value < other.count.value || size.value < other.size.value;
+ else if (count.valid)
+ return count.value < other.count.value;
+ else
+ return size.value < other.size.value;
+}
+bool QueueDepth::operator>(const QueueDepth& other) const
+{
+ if (count.valid && size.valid)
+ return count.value > other.count.value || size.value > other.size.value;
+ else if (count.valid)
+ return count.value > other.count.value;
+ else
+ return size.value > other.size.value;
+}
+bool QueueDepth::hasCount() const { return count.valid; }
+uint32_t QueueDepth::getCount() const { return count.value; }
+void QueueDepth::setCount(uint32_t c) { count.value = c; count.valid = true; }
+bool QueueDepth::hasSize() const { return size.valid; }
+uint64_t QueueDepth::getSize() const { return size.value; }
+void QueueDepth::setSize(uint64_t c) { size.value = c; size.valid = true; }
+
+namespace{
+ template <typename T> QueueDepth::Optional<T> add(const QueueDepth::Optional<T>& a, const QueueDepth::Optional<T>& b)
+ {
+ QueueDepth::Optional<T> result;
+ if (a.valid && b.valid) {
+ result.valid = true;
+ result.value = a.value + b.value;
+ }
+ return result;
+ }
+ template <typename T> QueueDepth::Optional<T> subtract(const QueueDepth::Optional<T>& a, const QueueDepth::Optional<T>& b)
+ {
+ QueueDepth::Optional<T> result;
+ if (a.valid && b.valid) {
+ result.valid = true;
+ result.value = a.value - b.value;
+ }
+ return result;
+ }
+}
+QueueDepth operator-(const QueueDepth& a, const QueueDepth& b)
+{
+ QueueDepth result;
+ result.count = subtract(a.count, b.count);
+ result.size = subtract(a.size, b.size);
+ return result;
+}
+
+QueueDepth operator+(const QueueDepth& a, const QueueDepth& b)
+{
+ QueueDepth result;
+ result.count = add(a.count, b.count);
+ result.size = add(a.size, b.size);
+ return result;
+
+}
+
+std::ostream& operator<<(std::ostream& o, const QueueDepth& d)
+{
+ if (d.hasCount()) o << "count: " << d.getCount();
+ if (d.hasSize()) {
+ if (d.hasCount()) o << ", ";
+ o << "size: " << d.getSize();
+ }
+ return o;
+}
+
+QueueDepth::operator bool() const { return hasCount() || hasSize(); }
+
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,74 @@
+#ifndef QPID_BROKER_QUEUEDEPTH_H
+#define QPID_BROKER_QUEUEDEPTH_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Represents a queue depth in message count and/or aggregate message
+ * size.
+ */
+class QueueDepth
+{
+ public:
+ QPID_BROKER_EXTERN QueueDepth();
+ QPID_BROKER_EXTERN QueueDepth(uint32_t count, uint64_t size);
+ QPID_BROKER_EXTERN QueueDepth& operator+=(const QueueDepth&);
+ QPID_BROKER_EXTERN QueueDepth& operator-=(const QueueDepth&);
+ QPID_BROKER_EXTERN bool operator==(const QueueDepth&) const;
+ QPID_BROKER_EXTERN bool operator!=(const QueueDepth&) const;
+ QPID_BROKER_EXTERN bool operator<(const QueueDepth& other) const;
+ QPID_BROKER_EXTERN bool operator>(const QueueDepth& other) const;
+ QPID_BROKER_EXTERN operator bool() const;
+ QPID_BROKER_EXTERN bool hasCount() const;
+ QPID_BROKER_EXTERN uint32_t getCount() const;
+ QPID_BROKER_EXTERN void setCount(uint32_t);
+ QPID_BROKER_EXTERN bool hasSize() const;
+ QPID_BROKER_EXTERN uint64_t getSize() const;
+ QPID_BROKER_EXTERN void setSize(uint64_t);
+ friend QPID_BROKER_EXTERN QueueDepth operator-(const QueueDepth&, const QueueDepth&);
+ friend QPID_BROKER_EXTERN QueueDepth operator+(const QueueDepth&, const QueueDepth&);
+ template <typename T> struct Optional
+ {
+ T value;
+ bool valid;
+
+ Optional(T v) : value(v), valid(true) {}
+ Optional() : value(0), valid(false) {}
+ };
+ private:
+ Optional<uint32_t> count;
+ Optional<uint64_t> size;
+};
+
+QPID_BROKER_EXTERN QueueDepth operator-(const QueueDepth&, const QueueDepth&);
+QPID_BROKER_EXTERN QueueDepth operator+(const QueueDepth&, const QueueDepth&);
+std::ostream& operator<<(std::ostream&, const QueueDepth&);
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEDEPTH_H*/
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueFactory.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/LossyQueue.h"
+#include "qpid/broker/Lvq.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDistributor.h"
+#include "qpid/broker/MessageGroupManager.h"
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/FifoDistributor.h"
+#include <map>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+
+QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
+
+boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
+{
+ settings.validate();
+
+ //1. determine Queue type (i.e. whether we are subclassing Queue)
+ // -> if 'ring' policy is in use then subclass
+ boost::shared_ptr<Queue> queue;
+ if (settings.dropMessagesAtLimit) {
+ queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+ } else if (settings.lvqKey.size()) {
+ std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
+ queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker));
+ } else {
+ queue = boost::shared_ptr<Queue>(new Queue(name, settings, settings.durable ? store : 0, parent, broker));
+ }
+
+ //2. determine Messages type (i.e. structure)
+ if (settings.priorities) {
+ if (settings.defaultFairshare || settings.fairshare.size()) {
+ queue->messages = Fairshare::create(settings);
+ } else {
+ queue->messages = std::auto_ptr<Messages>(new PriorityQueue(settings.priorities));
+ }
+ } else if (settings.lvqKey.empty()) {//LVQ already handled above
+ queue->messages = std::auto_ptr<Messages>(new MessageDeque());
+ }
+
+ //3. determine MessageDistributor type
+ if (settings.groupKey.size()) {
+ boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings));
+ queue->allocator = mgm;
+ queue->addObserver(mgm);
+ } else {
+ queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) ));
+ }
+
+
+ //4. threshold event config
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getOptions().queueThresholdEventRatio);
+ }
+ //5. flow control config
+ QueueFlowLimit::observe(*queue, settings);
+
+ return queue;
+}
+
+void QueueFactory::setBroker(Broker* b)
+{
+ broker = b;
+}
+Broker* QueueFactory::getBroker()
+{
+ return broker;
+}
+void QueueFactory::setStore (MessageStore* s)
+{
+ store = s;
+}
+MessageStore* QueueFactory::getStore() const
+{
+ return store;
+}
+void QueueFactory::setParent(management::Manageable* p)
+{
+ parent = p;
+}
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,73 @@
+#ifndef QPID_BROKER_QUEUEFACTORY_H
+#define QPID_BROKER_QUEUEFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace management {
+class Manageable;
+}
+namespace broker {
+class Broker;
+class MessageStore;
+class Queue;
+struct QueueSettings;
+
+/**
+ * Handles the creation and configuration of a Queue instance in order
+ * to meet the required settings
+ */
+class QueueFactory
+{
+ public:
+ QPID_BROKER_EXTERN QueueFactory();
+
+ QPID_BROKER_EXTERN boost::shared_ptr<Queue> create(const std::string& name, const QueueSettings& settings);
+
+ void setBroker(Broker*);
+ Broker* getBroker();
+
+ /**
+ * Set the store to use. May only be called once.
+ */
+ void setStore (MessageStore*);
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* getStore() const;
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent(management::Manageable*);
+ private:
+ Broker* broker;
+ MessageStore* store;
+ management::Manageable* parent;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEFACTORY_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Aug 10 12:04:27 2012
@@ -20,7 +20,9 @@
*/
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -57,34 +59,6 @@ namespace {
<< "=" << max));
}
}
-
- /** extract a capacity value as passed in an argument map
- */
- uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
- {
- FieldTable::ValuePtr v = settings.get(key);
-
- int64_t result = 0;
-
- if (!v) return defaultValue;
- if (v->getType() == 0x23) {
- QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
- } else if (v->getType() == 0x33) {
- QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int64_t>()) {
- result = v->get<int64_t>();
- QPID_LOG(debug, "Got integer value for " << key << ": " << result);
- if (result >= 0) return result;
- } else if (v->convertsTo<std::string>()) {
- std::string s(v->get<std::string>());
- QPID_LOG(debug, "Got string value for " << key << ": " << s);
- std::istringstream convert(s);
- if (convert >> result && result >= 0) return result;
- }
-
- QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
- return defaultValue;
- }
}
@@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
if (queue) {
queueName = _queue->getName();
- if (queue->getPolicy()) {
- maxSize = _queue->getPolicy()->getMaxSize();
- maxCount = _queue->getPolicy()->getMaxCount();
- }
+ if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
+ if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
if (queueMgmtObj) {
@@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit()
sys::Mutex::ScopedLock l(indexLock);
if (!index.empty()) {
// we're gone - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
try {
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
} catch (...) {} // ignore - not safe for a destructor to throw.
index.clear();
}
}
-void QueueFlowLimit::enqueued(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.payload->contentSize();
+ size += msg.getContentSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const Queu
if (flowStopped || !index.empty()) {
// ignore flow control if we are populating the queue due to cluster replication:
if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+ QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
return;
}
- QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
- msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
+ QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
+ msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}
@@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const Queu
-void QueueFlowLimit::dequeued(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
@@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const Queu
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.payload->contentSize();
+ uint64_t _size = msg.getContentSize();
if (_size <= size) {
size -= _size;
} else {
@@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const Queu
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+ std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence());
if (itr != index.end()) { // this msg is flow controlled, release it:
- msg.payload->getIngressCompletion().finishCompleter();
+ msg.getPersistentContext()->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_
}
-void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
{
QueueFlowLimit *ptr = createLimit( &queue, settings );
if (ptr) {
@@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queu
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
{
- std::string type(QueuePolicy::getType(settings));
-
- if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+ if (settings.dropMessagesAtLimit) {
// The size of a RING queue is limited by design - no need for flow control.
return 0;
}
- if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
- settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+ if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
// user provided (some) flow settings manually...
- uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
- uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
- uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
- uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
- if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
+ if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
+ return new QueueFlowLimit(queue,
+ settings.flowStop.getCount(),
+ settings.flowResume.getCount(),
+ settings.flowStop.getSize(),
+ settings.flowResume.getSize());
+ } else {
+ //don't have a non-zero value for either the count or the
+ //size to stop at, yet at least one of these settings was
+ //provided, i.e it was set to 0 explicitly which we treat
+ //as turning it off
return 0;
}
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
if (defaultFlowStopRatio) { // broker has a default ratio setup...
- uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+ uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default
+ uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
return 0;
@@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::fram
framing::SequenceSet ss;
if (!index.empty()) {
/* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
itr != index.end(); ++itr) {
ss.add(itr->first);
}
@@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg;
+ Message msg;
queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Fri Aug 10 12:04:27 2012
@@ -26,9 +26,9 @@
#include <iostream>
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -45,6 +45,8 @@ namespace qpid {
namespace broker {
class Broker;
+class Queue;
+struct QueueSettings;
/**
* Producer flow control: when level is > flowStop*, flow control is ON.
@@ -80,13 +82,13 @@ class Broker;
QPID_BROKER_EXTERN virtual ~QueueFlowLimit();
- /** the queue has added QueuedMessage. Returns true if flow state changes */
- QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
- /** the queue has removed QueuedMessage. Returns true if flow state changes */
- QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+ /** the queue has added QueuedMessage */
+ QPID_BROKER_EXTERN void enqueued(const Message&);
+ /** the queue has removed QueuedMessage */
+ QPID_BROKER_EXTERN void dequeued(const Message&);
/** ignored */
- QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
- QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
+ QPID_BROKER_EXTERN void acquired(const Message&) {};
+ QPID_BROKER_EXTERN void requeued(const Message&) {};
/** for clustering: */
QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
@@ -106,14 +108,14 @@ class Broker;
void decode(framing::Buffer& buffer);
uint32_t encodedSize() const;
- static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
protected:
// msgs waiting for flow to become available.
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
+ std::map<framing::SequenceNumber, Message > index;
mutable qpid::sys::Mutex indexLock;
_qmfBroker::Queue *queueMgmtObj;
@@ -123,7 +125,7 @@ class Broker;
QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize);
- static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h Fri Aug 10 12:04:27 2012
@@ -24,8 +24,8 @@
namespace qpid {
namespace broker {
-struct QueuedMessage;
class Consumer;
+class Message;
/**
* Interface for notifying classes who want to act as 'observers' of a queue of particular
@@ -63,10 +63,10 @@ class QueueObserver
virtual ~QueueObserver() {}
// note: the Queue will hold the messageLock while calling these methods!
- virtual void enqueued(const QueuedMessage&) = 0;
- virtual void dequeued(const QueuedMessage&) = 0;
- virtual void acquired(const QueuedMessage&) = 0;
- virtual void requeued(const QueuedMessage&) = 0;
+ virtual void enqueued(const Message&) = 0;
+ virtual void dequeued(const Message&) = 0;
+ virtual void acquired(const Message&) = 0;
+ virtual void requeued(const Message&) = 0;
virtual void consumerAdded( const Consumer& ) {};
virtual void consumerRemoved( const Consumer& ) {};
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Aug 10 12:04:27 2012
@@ -21,7 +21,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -32,50 +31,43 @@ using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
-QueueRegistry::QueueRegistry(Broker* b) :
- counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
+QueueRegistry::QueueRegistry(Broker* b)
+{
+ setBroker(b);
+}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const OwnershipToken* owner,
+QueueRegistry::declare(const string& name, const QueueSettings& settings,
boost::shared_ptr<Exchange> alternate,
- const qpid::framing::FieldTable& arguments,
bool recovering/*true if this declare is a
result of recovering queue
- definition from persistente
+ definition from persistent
record*/)
{
- Queue::shared_ptr queue;
std::pair<Queue::shared_ptr, bool> result;
{
RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
QueueMap::iterator i = queues.find(name);
-
if (i == queues.end()) {
- queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ Queue::shared_ptr queue = create(name, settings);
+ //Move this to factory also?
if (alternate) {
queue->setAlternateExchange(alternate);//need to do this *before* create
alternate->incAlternateUsers();
}
if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
- } else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ //create persistent record if required
+ queue->create();
}
queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
}
- if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
@@ -89,7 +81,7 @@ void QueueRegistry::destroy(const string
queues.erase(i);
}
}
- if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
+ if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
@@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(con
return q;
}
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
-
void QueueRegistry::setStore (MessageStore* _store)
{
- store = _store;
+ QueueFactory::setStore(_store);
}
-MessageStore* QueueRegistry::getStore() const {
- return store;
+MessageStore* QueueRegistry::getStore() const
+{
+ return QueueFactory::getStore();
}
-void QueueRegistry::updateQueueClusterState(bool _lastNode)
+void QueueRegistry::setParent(qpid::management::Manageable* _parent)
{
- RWlock::ScopedRlock locker(lock);
- for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
- if (_lastNode){
- i->second->setLastNodeFailure();
- } else {
- i->second->clearLastNodeFailure();
- }
- }
- lastNode = _lastNode;
+ QueueFactory::setParent(_parent);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Fri Aug 10 12:04:27 2012
@@ -22,8 +22,8 @@
#define _QueueRegistry_
#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueFactory.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/management/Manageable.h"
#include "qpid/framing/FieldTable.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -34,11 +34,8 @@ namespace qpid {
namespace broker {
class Queue;
-class QueueEvents;
class Exchange;
class OwnershipToken;
-class Broker;
-class MessageStore;
/**
* A registry of queues indexed by queue name.
@@ -47,7 +44,7 @@ class MessageStore;
* are deleted when and only when they are no longer in use.
*
*/
-class QueueRegistry {
+class QueueRegistry : QueueFactory {
public:
QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
QPID_BROKER_EXTERN ~QueueRegistry();
@@ -60,11 +57,8 @@ class QueueRegistry {
*/
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
const std::string& name,
- bool durable = false,
- bool autodelete = false,
- const OwnershipToken* owner = 0,
+ const QueueSettings& settings,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
- const qpid::framing::FieldTable& args = framing::FieldTable(),
bool recovering = false);
/**
@@ -101,11 +95,6 @@ class QueueRegistry {
QPID_BROKER_EXTERN boost::shared_ptr<Queue> get(const std::string& name);
/**
- * Generate unique queue name.
- */
- std::string generateName();
-
- /**
* Set the store to use. May only be called once.
*/
void setStore (MessageStore*);
@@ -118,7 +107,7 @@ class QueueRegistry {
/**
* Register the manageable parent for declared queues
*/
- void setParent (management::Manageable* _parent) { parent = _parent; }
+ void setParent (management::Manageable*);
/** Call f for each queue in the registry. */
template <class F> void eachQueue(F f) const {
@@ -127,22 +116,10 @@ class QueueRegistry {
f(i->second);
}
- /**
- * Change queue mode when cluster size drops to 1 node, expands again
- * in practice allows flow queue to disk when last name to be exectuted
- */
- void updateQueueClusterState(bool lastNode);
-
private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
mutable qpid::sys::RWlock lock;
- int counter;
- MessageStore* store;
- QueueEvents* events;
- management::Manageable* parent;
- bool lastNode; //used to set mode on queue declare
- Broker* broker;
};
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueSettings.h"
+#include "QueueFlowLimit.h"
+#include "MessageGroupManager.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+
+
+namespace qpid {
+namespace broker {
+
+namespace {
+const std::string MAX_COUNT("qpid.max_count");
+const std::string MAX_SIZE("qpid.max_size");
+const std::string POLICY_TYPE("qpid.policy_type");
+const std::string POLICY_TYPE_REJECT("reject");
+const std::string POLICY_TYPE_RING("ring");
+const std::string NO_LOCAL("no-local");
+const std::string TRACE_ID("qpid.trace.id");
+const std::string TRACE_EXCLUDES("qpid.trace.exclude");
+const std::string LVQ_KEY("qpid.last_value_queue_key");
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+const std::string ALERT_REPEAT_GAP("qpid.alert_repeat_gap");
+const std::string ALERT_COUNT("qpid.alert_count");
+const std::string ALERT_SIZE("qpid.alert_size");
+const std::string PRIORITIES("qpid.priorities");
+const std::string FAIRSHARE("qpid.fairshare");
+const std::string FAIRSHARE_ALIAS("x-qpid-fairshare");
+
+const std::string LVQ_LEGACY("qpid.last_value_queue");
+const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
+const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
+
+
+bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
+{
+ if (key.find(basename) == 0) {
+ qpid::types::Variant index(key.substr(basename.size()+1));
+ settings.fairshare[index] = value;
+ return true;
+ } else {
+ return false;
+ }
+}
+bool isFairshareSetting(const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
+{
+ return handleFairshareSetting(FAIRSHARE, key, value, settings) || handleFairshareSetting(FAIRSHARE_ALIAS, key, value, settings);
+}
+}
+
+const QueueSettings::Aliases QueueSettings::aliases;
+
+QueueSettings::QueueSettings(bool d, bool a) :
+ durable(d),
+ autodelete(a),
+ priorities(0),
+ defaultFairshare(0),
+ shareGroups(false),
+ addTimestamp(false),
+ dropMessagesAtLimit(false),
+ noLocal(false),
+ autoDeleteDelay(0),
+ alertRepeatInterval(60)
+{}
+
+bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
+{
+ if (key == MAX_COUNT && value.asUint32() > 0) {
+ maxDepth.setCount(value);
+ return true;
+ } else if (key == MAX_SIZE && value.asUint64() > 0) {
+ maxDepth.setSize(value);
+ return true;
+ } else if (key == POLICY_TYPE) {
+ if (value.getString() == POLICY_TYPE_RING) {
+ dropMessagesAtLimit = true;
+ return true;
+ } else if (value.getString() == POLICY_TYPE_REJECT) {
+ //do nothing, thats the default
+ return true;
+ } else {
+ QPID_LOG(warning, "Unrecognised policy option: " << value);
+ return false;
+ }
+ } else if (key == NO_LOCAL) {
+ noLocal = true;
+ return true;
+ } else if (key == TRACE_ID) {
+ traceId = value.asString();
+ return true;
+ } else if (key == TRACE_EXCLUDES) {
+ traceExcludes = value.asString();
+ return true;
+ } else if (key == PRIORITIES) {
+ priorities = value;
+ return true;
+ } else if (key == FAIRSHARE) {
+ defaultFairshare = value;
+ return true;
+ } else if (isFairshareSetting(key, value, *this)) {
+ return true;
+ } else if (key == MessageGroupManager::qpidMessageGroupKey) {
+ groupKey = value.asString();
+ return true;
+ } else if (key == MessageGroupManager::qpidSharedGroup) {
+ shareGroups = value;
+ return true;
+ } else if (key == MessageGroupManager::qpidMessageGroupTimestamp) {
+ addTimestamp = value;
+ return true;
+ } else if (key == LVQ_KEY) {
+ lvqKey = value.asString();
+ return true;
+ } else if (key == LVQ_LEGACY) {
+ if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
+ return true;
+ } else if (key == LVQ_LEGACY_NOBROWSE) {
+ QPID_LOG(warning, "Ignoring 'no-browse' directive for LVQ; it is no longer necessary");
+ if (lvqKey.empty()) lvqKey = LVQ_LEGACY_KEY;
+ return true;
+ } else if (key == AUTO_DELETE_TIMEOUT) {
+ autoDeleteDelay = value;
+ return true;
+ } else if (key == QueueFlowLimit::flowStopCountKey) {
+ flowStop.setCount(value);
+ return true;
+ } else if (key == QueueFlowLimit::flowResumeCountKey) {
+ flowResume.setCount(value);
+ return true;
+ } else if (key == QueueFlowLimit::flowStopSizeKey) {
+ flowStop.setSize(value);
+ return true;
+ } else if (key == QueueFlowLimit::flowResumeSizeKey) {
+ flowResume.setSize(value);
+ return true;
+ } else if (key == ALERT_REPEAT_GAP) {
+ alertRepeatInterval = value;
+ return true;
+ } else if (key == ALERT_COUNT) {
+ alertThreshold.setCount(value);
+ return true;
+ } else if (key == ALERT_SIZE) {
+ alertThreshold.setSize(value);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void QueueSettings::validate() const
+{
+ if (lvqKey.size() && priorities > 0)
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PRIORITIES << " for the same queue"));
+ if ((fairshare.size() || defaultFairshare) && priorities == 0)
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify fairshare settings when queue is not enabled for priorities"));
+ if (fairshare.size() > priorities)
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot have fairshare set for priority levels greater than " << priorities));
+ if (groupKey.size() && lvqKey.size())
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
+ if (groupKey.size() && priorities)
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << MessageGroupManager::qpidMessageGroupKey << " for the same queue"));
+ if (shareGroups && groupKey.empty()) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidSharedGroup
+ << " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+ }
+ if (addTimestamp && groupKey.empty()) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MessageGroupManager::qpidMessageGroupTimestamp
+ << " if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+ }
+
+ // @todo: remove once "sticky" consumers are supported - see QPID-3347
+ if (!shareGroups && groupKey.size()) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Only shared groups are supported at present; " << MessageGroupManager::qpidSharedGroup
+ << " is required if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
+ }
+}
+
+void QueueSettings::populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused)
+{
+ original = inputs;
+ for (qpid::types::Variant::Map::const_iterator i = inputs.begin(); i != inputs.end(); ++i) {
+ Aliases::const_iterator a = aliases.find(i->first);
+ if (!handle((a != aliases.end() ? a->second : i->first), i->second)) unused.insert(*i);
+ }
+}
+void QueueSettings::populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused)
+{
+ qpid::types::Variant::Map o;
+ qpid::amqp_0_10::translate(inputs, original);
+ populate(original, o);
+ qpid::amqp_0_10::translate(o, unused);
+}
+std::map<std::string, qpid::types::Variant> QueueSettings::asMap() const
+{
+ return original;
+}
+
+QueueSettings::Aliases::Aliases()
+{
+ insert(value_type("x-qpid-priorities", "qpid.priorities"));
+ insert(value_type("x-qpid-fairshare", "qpid.fairshare"));
+ insert(value_type("x-qpid-minimum-alert-repeat-gap", "qpid.alert_repeat_gap"));
+ insert(value_type("x-qpid-maximum-message-count", "qpid.alert_count"));
+ insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size"));
+}
+
+}} // namespace qpid::broker
Added: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,92 @@
+#ifndef QPID_BROKER_QUEUESETTINGS_H
+#define QPID_BROKER_QUEUESETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueueDepth.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/framing/FieldTable.h"
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace types {
+class Variant;
+}
+namespace broker {
+
+/**
+ * Defines the various queue configuration settings that can be specified
+ */
+struct QueueSettings
+{
+ QPID_BROKER_EXTERN QueueSettings(bool durable=false, bool autodelete=false);
+
+ bool durable;
+ bool autodelete;
+
+ //basic queue types:
+ std::string lvqKey;
+ uint32_t priorities;
+ uint32_t defaultFairshare;
+ std::map<uint32_t,uint32_t> fairshare;
+
+ //message groups:
+ std::string groupKey;
+ bool shareGroups;
+ bool addTimestamp;//not actually used; always on at present?
+
+ QueueDepth maxDepth;
+ bool dropMessagesAtLimit;//aka ring queue policy
+
+ bool noLocal;
+ std::string traceId;
+ std::string traceExcludes;
+ uint64_t autoDeleteDelay;//queueTtl?
+
+ //flow control:
+ QueueDepth flowStop;
+ QueueDepth flowResume;
+
+ //threshold events:
+ QueueDepth alertThreshold;
+ int64_t alertRepeatInterval;
+
+ //yuck, yuck
+ qpid::framing::FieldTable storeSettings;
+ std::map<std::string, qpid::types::Variant> original;
+
+ bool handle(const std::string& key, const qpid::types::Variant& value);
+ void validate() const;
+ QPID_BROKER_EXTERN void populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused);
+ QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused);
+ std::map<std::string, qpid::types::Variant> asMap() const;
+
+ struct Aliases : std::map<std::string, std::string>
+ {
+ Aliases();
+ };
+ static const Aliases aliases;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUESETTINGS_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Fri Aug 10 12:04:27 2012
@@ -22,8 +22,8 @@
#define _QueuedMessage_
#include "qpid/broker/Message.h"
-#include "BrokerImportExport.h"
-#include <iosfwd>
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/broker/BrokerImportExport.h"
namespace qpid {
namespace broker {
@@ -32,20 +32,19 @@ class Queue;
struct QueuedMessage
{
- boost::intrusive_ptr<Message> payload;
+ Message message;
framing::SequenceNumber position;
- typedef enum { AVAILABLE, ACQUIRED, DELETED, REMOVED } Status;
- Status status;
+ enum {AVAILABLE, ACQUIRED, DELETED, REMOVED} status;
Queue* queue;
- QueuedMessage(Queue* q=0,
- boost::intrusive_ptr<Message> msg=0,
- framing::SequenceNumber sn=0,
- Status st=AVAILABLE
- ) : payload(msg), position(sn), status(st), queue(q) {}
+ QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q, Message msg, framing::SequenceNumber sn) :
+ message(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
};
-inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
+inline bool operator<(const QueuedMessage& a, const QueuedMessage& b)
+{
return a.position < b.position;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Fri Aug 10 12:04:27 2012
@@ -22,10 +22,9 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredDequeue.h"
-using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
{
queue->recoverPrepared(msg);
}
@@ -38,11 +37,11 @@ bool RecoveredDequeue::prepare(Transacti
void RecoveredDequeue::commit() throw()
{
- queue->enqueueAborted(msg);
+ queue->dequeueCommited(msg);
}
void RecoveredDequeue::rollback() throw()
{
- queue->process(msg);
+ queue->dequeueAborted(msg);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Fri Aug 10 12:04:27 2012
@@ -26,8 +26,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
-#include <boost/intrusive_ptr.hpp>
-
#include <algorithm>
#include <functional>
#include <list>
@@ -36,18 +34,17 @@ namespace qpid {
namespace broker {
class RecoveredDequeue : public TxOp{
boost::shared_ptr<Queue> queue;
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+ RecoveredDequeue(boost::shared_ptr<Queue> queue, Message msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
boost::shared_ptr<Queue> getQueue() const { return queue; }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ Message getMessage() const { return msg; }
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Fri Aug 10 12:04:27 2012
@@ -22,10 +22,9 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredEnqueue.h"
-using boost::intrusive_ptr;
using namespace qpid::broker;
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg)
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message _msg) : queue(_queue), msg(_msg)
{
queue->recoverPrepared(msg);
}
@@ -36,7 +35,7 @@ bool RecoveredEnqueue::prepare(Transacti
}
void RecoveredEnqueue::commit() throw(){
- queue->process(msg);
+ queue->enqueueCommited(msg);
}
void RecoveredEnqueue::rollback() throw(){
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Fri Aug 10 12:04:27 2012
@@ -26,8 +26,6 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/TxOp.h"
-#include <boost/intrusive_ptr.hpp>
-
#include <algorithm>
#include <functional>
#include <list>
@@ -36,19 +34,17 @@ namespace qpid {
namespace broker {
class RecoveredEnqueue : public TxOp{
boost::shared_ptr<Queue> queue;
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+ RecoveredEnqueue(boost::shared_ptr<Queue> queue, Message msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredEnqueue(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
boost::shared_ptr<Queue> getQueue() const { return queue; }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
-
+ Message getMessage() const { return msg; }
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Aug 10 12:04:27 2012
@@ -21,11 +21,13 @@
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
using boost::dynamic_pointer_cast;
@@ -43,9 +45,9 @@ RecoveryManagerImpl::~RecoveryManagerImp
class RecoverableMessageImpl : public RecoverableMessage
{
- intrusive_ptr<Message> msg;
+ Message msg;
public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
+ RecoverableMessageImpl(const Message& _msg);
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
@@ -128,9 +130,10 @@ RecoverableQueue::shared_ptr RecoveryMan
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- boost::intrusive_ptr<Message> message(new Message());
- message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
+ //TODO: determine encoding/version actually used
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
@@ -163,12 +166,7 @@ void RecoveryManagerImpl::recoveryComple
exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
}
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
- if (!msg->isPersistent()) {
- msg->forcePersistent(); // set so that message will get dequeued from store.
- }
-}
+RecoverableMessageImpl:: RecoverableMessageImpl(const Message& _msg) : msg(_msg) {}
bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
{
@@ -177,7 +175,7 @@ bool RecoverableMessageImpl::loadContent
void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
{
- msg->decodeContent(buffer);
+ msg.getPersistentContext()->decodeContent(buffer);
}
void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
@@ -187,12 +185,12 @@ void RecoverableMessageImpl::recover(Que
void RecoverableMessageImpl::setPersistenceId(uint64_t id)
{
- msg->setPersistenceId(id);
+ msg.getPersistentContext()->setPersistenceId(id);
}
void RecoverableMessageImpl::setRedelivered()
{
- msg->redeliver();
+ msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
@@ -204,7 +202,7 @@ void RecoverableQueueImpl::setPersistenc
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Aug 10 12:04:27 2012
@@ -29,7 +29,7 @@
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SequenceSet.h"
@@ -65,9 +65,8 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
+SemanticState::SemanticState(SessionState& ss)
: session(ss),
- deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
@@ -89,7 +88,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
- recover(true);
+ requeue();
//now unsubscribe, which may trigger queue deletion and thus
//needs to occur after the requeueing of unacked messages
@@ -124,7 +123,7 @@ void SemanticState::consume(const string
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
c = ConsumerImpl::shared_ptr(
- new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
@@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _acquire,
+ SubscriptionType type,
bool _exclusive,
const string& _tag,
const string& _resumeId,
@@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImp
const framing::FieldTable& _arguments
) :
- Consumer(_name, _acquire),
+Consumer(_name, type),
parent(_parent),
queue(_queue),
ackExpected(ack),
- acquire(_acquire),
+ acquire(type == CONSUMER),
blocked(true),
exclusive(_exclusive),
resumeId(_resumeId),
@@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerI
return &(parent->session);
}
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+{
+ return deliver(cursor, msg, shared_from_this());
+}
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
assertClusterSafe();
- allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
- shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
+ allocateCredit(msg);
+ DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+ consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- parent->deliver(record, sync);
+ const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
+
+ record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+ ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
+ acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
+ msg.getAnnotations(),
+ sync));
if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- msg.queue->dequeue(0, msg);
+ queue->dequeue(0 /*ctxt*/, cursor);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
+bool SemanticState::ConsumerImpl::filter(const Message&)
{
return true;
}
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::accept(const Message& msg)
{
assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
@@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const Co
}
}
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
assertClusterSafe();
Credit original = credit;
- credit.consume(1, msg->getRequiredCredit());
+ credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
}
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
{
- bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+ bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
- << " credit for message of " << msg->getRequiredCredit() << " bytes: "
+ << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
<< credit);
return enoughCredit;
}
@@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl
session.getConnection().outputTasks.removeOutputTask(c.get());
}
-
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
disable(c);
@@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl:
c->cancel();
}
-void SemanticState::handle(intrusive_ptr<Message> msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- if (msg->isContentReleaseRequested()) {
- // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
- // presence of these messages). Do not change these without also checking these tests.
- if (msg->isContentReleaseBlocked()) {
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
- } else {
- msg->releaseContent();
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released");
- }
- }
- }
-}
-
-namespace
+TxBuffer* SemanticState::getTxBuffer()
{
-const std::string nullstring;
+ return txBuffer.get();
}
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+void SemanticState::route(Message& msg, Deliverable& strategy) {
+ msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
- std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName
- || cacheExchange->isDestroyed())
- {
+ std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
- cacheExchange->setProperties(msg);
/* verify the userid if specified: */
- std::string id =
- msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+ std::string id = msg.getUserId();
if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<
AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
- if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
+ if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() ))
throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
- exchangeName << " with routing-key " << msg->getRoutingKey()));
+ exchangeName << " with routing-key " << msg.getRoutingKey()));
}
cacheExchange->route(strategy);
@@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy);
}
- if (!strategy.delivered) {
- msg->destroy();
- }
}
}
@@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::comple
}
}
-void SemanticState::recover(bool requeue)
+void SemanticState::requeue()
{
- if(requeue){
- //take copy and clear unacked as requeue may result in redelivery to this session
- //which will in turn result in additions to unacked
- DeliveryRecords copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
- //unconfirmed messages re redelivered and therefore have their
- //id adjusted, confirmed messages are not and so the ordering
- //w.r.t id is lost
- sort(unacked.begin(), unacked.end());
- }
+ //take copy and clear unacked as requeue may result in redelivery to this session
+ //which will in turn result in additions to unacked
+ DeliveryRecords copy = unacked;
+ unacked.clear();
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
getSession().setUnackedCount(unacked.size());
}
-void SemanticState::deliver(DeliveryRecord& msg, bool sync)
-{
- return deliveryAdapter.deliver(msg, sync);
-}
+
+SessionContext& SemanticState::getSession() { return session; }
+const SessionContext& SemanticState::getSession() const { return session; }
+
const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Aug 10 12:04:27 2012
@@ -26,7 +26,6 @@
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Credit.h"
#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxManager.h"
@@ -34,12 +33,15 @@
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/TxBuffer.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
@@ -47,13 +49,15 @@
#include <vector>
#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
namespace qpid {
namespace broker {
+class Exchange;
+class MessageStore;
class SessionContext;
+class SessionState;
/**
*
@@ -94,28 +98,28 @@ class SemanticState : private boost::non
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription* mgmtObject;
- bool checkCredit(boost::intrusive_ptr<Message>& msg);
- void allocateCredit(boost::intrusive_ptr<Message>& msg);
+ bool checkCredit(const Message& msg);
+ void allocateCredit(const Message& msg);
bool haveCredit();
protected:
QPID_BROKER_EXTERN virtual bool doDispatch();
size_t unacked() { return parent->unacked.size(); }
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- QPID_BROKER_EXTERN ConsumerImpl(
- SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId, uint64_t resumeTtl,
- const framing::FieldTable& arguments);
- QPID_BROKER_EXTERN virtual ~ConsumerImpl();
+ QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, SubscriptionType type, bool exclusive,
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
+ QPID_BROKER_EXTERN ~ConsumerImpl();
QPID_BROKER_EXTERN OwnershipToken* getSession();
- QPID_BROKER_EXTERN virtual bool deliver(QueuedMessage& msg);
- QPID_BROKER_EXTERN bool filter(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN bool accept(boost::intrusive_ptr<Message> msg);
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+ QPID_BROKER_EXTERN bool filter(const Message&);
+ QPID_BROKER_EXTERN bool accept(const Message&);
QPID_BROKER_EXTERN void cancel() {}
QPID_BROKER_EXTERN void disableNotify();
@@ -153,7 +157,7 @@ class SemanticState : private boost::non
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
- void acknowledged(const broker::QueuedMessage&) {}
+ void acknowledged(const DeliveryRecord&) {}
// manageable entry points
QPID_BROKER_EXTERN management::ManagementObject*
@@ -168,8 +172,7 @@ class SemanticState : private boost::non
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
- SessionContext& session;
- DeliveryAdapter& deliveryAdapter;
+ SessionState& session;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
DeliveryRecords unacked;
@@ -185,7 +188,6 @@ class SemanticState : private boost::non
//needed for queue delete events in auto-delete:
const std::string connectionId;
- void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -196,11 +198,11 @@ class SemanticState : private boost::non
public:
- SemanticState(DeliveryAdapter&, SessionContext&);
+ SemanticState(SessionState&);
~SemanticState();
- SessionContext& getSession() { return session; }
- const SessionContext& getSession() const { return session; }
+ SessionContext& getSession();
+ const SessionContext& getSession() const;
const ConsumerImpl::shared_ptr find(const std::string& destination) const;
bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
@@ -239,12 +241,12 @@ class SemanticState : private boost::non
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void recover(bool requeue);
- void deliver(DeliveryRecord& message, bool sync);
+ TxBuffer* getTxBuffer();
+ void requeue();
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
- void handle(boost::intrusive_ptr<Message> msg);
+ void route(Message& msg, Deliverable& strategy);
void completed(const framing::SequenceSet& commands);
void accepted(const framing::SequenceSet& commands);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug 10 12:04:27 2012
@@ -264,7 +264,7 @@ QueueQueryResult SessionAdapter::QueueHa
queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
- queue->getSettings(),
+ queue->getEncodableSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
} else {
@@ -294,19 +294,24 @@ void SessionAdapter::QueueHandlerImpl::d
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
+ QueueSettings settings(durable, autoDelete);
+ try {
+ settings.populate(arguments, settings.storeSettings);
+ } catch (const qpid::types::Exception& e) {
+ throw InvalidArgumentException(e.what());
+ }
+
std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().createQueue(name, durable,
- autoDelete,
+ getBroker().createQueue(name, settings,
exclusive ? &session : 0,
alternateExchange,
- arguments,
getConnection().getUserId(),
getConnection().getUrl());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
//handle automatic cleanup:
- if (exclusive) {
+ if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Aug 10 12:04:27 2012
@@ -32,7 +32,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+ : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out),
clusterOrderProxy(c.getClusterOrderOutput() ?
@@ -75,7 +75,7 @@ ConnectionState& SessionHandler::getConn
const ConnectionState& SessionHandler::getConnection() const { return connection; }
void SessionHandler::handleDetach() {
- amqp_0_10::SessionHandler::handleDetach();
+ qpid::amqp_0_10::SessionHandler::handleDetach();
assert(&connection.getChannel(channel.get()) == this);
if (session.get())
connection.getBroker().getSessionManager().detach(session);
@@ -125,7 +125,7 @@ void SessionHandler::attached(const std:
{
if (session.get()) {
session->addManagementObject(); // Delayed from attachAs()
- amqp_0_10::SessionHandler::attached(name);
+ qpid::amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Aug 10 12:04:27 2012
@@ -41,7 +41,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public amqp_0_10::SessionHandler {
+class SessionHandler : public qpid::amqp_0_10::SessionHandler {
public:
class ErrorListener {
public:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org