You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/05 14:14:39 UTC
svn commit: r482639 - in /incubator/qpid/trunk/qpid/cpp/lib/broker:
BrokerMessage.cpp BrokerMessage.h BrokerQueue.cpp BrokerQueue.h Makefile.am
QueuePolicy.cpp QueuePolicy.h SessionHandlerImpl.cpp
Author: gsim
Date: Tue Dec 5 05:14:38 2006
New Revision: 482639
URL: http://svn.apache.org/viewvc?view=rev&rev=482639
Log:
Added queue policy class for controlling when message content should be released from memory.
Added:
incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp (with props)
incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Tue Dec 5 05:14:38 2006
@@ -32,6 +32,7 @@
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
Message::Message(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
@@ -100,6 +101,7 @@
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
+ Mutex::ScopedLock locker(contentLock);
if (content.get()) content->send(out, channel, framesize);
}
@@ -173,6 +175,7 @@
void Message::encodeContent(Buffer& buffer)
{
+ Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
@@ -183,6 +186,7 @@
u_int32_t Message::encodedContentSize()
{
+ Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
@@ -200,6 +204,7 @@
void Message::releaseContent(MessageStore* store)
{
+ Mutex::ScopedLock locker(contentLock);
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored content):
@@ -212,5 +217,6 @@
void Message::setContent(std::auto_ptr<Content>& _content)
{
+ Mutex::ScopedLock locker(contentLock);
content = _content;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h Tue Dec 5 05:14:38 2006
@@ -23,13 +23,14 @@
#include <memory>
#include <boost/shared_ptr.hpp>
-#include <ConnectionToken.h>
-#include <Content.h>
-#include <TxBuffer.h>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
#include <BasicHeaderProperties.h>
+#include <ConnectionToken.h>
+#include <Content.h>
#include <OutputHandler.h>
+#include <Mutex.h>
+#include <TxBuffer.h>
namespace qpid {
namespace broker {
@@ -52,6 +53,7 @@
std::auto_ptr<Content> content;
u_int64_t size;
u_int64_t persistenceId;
+ qpid::sys::Mutex contentLock;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize);
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Tue Dec 5 05:14:38 2006
@@ -26,6 +26,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid::framing;
Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
@@ -62,8 +63,7 @@
}
void Queue::recover(Message::shared_ptr& msg){
- queueing = true;
- messages.push(msg);
+ push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
msg->releaseContent(store);
}
@@ -72,8 +72,7 @@
void Queue::process(Message::shared_ptr& msg){
Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
+ push(msg);
}
}
@@ -116,7 +115,7 @@
while(proceed){
Mutex::ScopedLock locker(lock);
if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
+ pop();
}else{
dispatching = false;
proceed = false;
@@ -149,7 +148,7 @@
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
- messages.pop();
+ pop();
}
return msg;
}
@@ -157,10 +156,19 @@
u_int32_t Queue::purge(){
Mutex::ScopedLock locker(lock);
int count = messages.size();
- while(!messages.empty()) messages.pop();
+ while(!messages.empty()) pop();
return count;
}
+void Queue::pop(){
+ messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+}
+
u_int32_t Queue::getMessageCount() const{
Mutex::ScopedLock locker(lock);
return messages.size();
@@ -190,8 +198,30 @@
}
}
-void Queue::create()
+namespace
{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ u_int32_t maxCount(0);
+ try {
+ maxCount = settings.getInt(qpidMaxSize);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ u_int32_t maxSize(0);
+ try {
+ maxSize = settings.getInt(qpidMaxCount);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ if (maxCount || maxSize) {
+ setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
+ }
+
if (store) {
store->create(*this);
}
@@ -202,4 +232,9 @@
if (store) {
store->destroy(*this);
}
+}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Tue Dec 5 05:14:38 2006
@@ -22,6 +22,7 @@
#define _Queue_
#include <vector>
+#include <memory>
#include <queue>
#include <boost/shared_ptr.hpp>
#include <amqp_types.h>
@@ -29,7 +30,9 @@
#include <ConnectionToken.h>
#include <Consumer.h>
#include <BrokerMessage.h>
+#include <FieldTable.h>
#include <sys/Monitor.h>
+#include <QueuePolicy.h>
namespace qpid {
namespace broker {
@@ -41,6 +44,7 @@
struct ExclusiveAccessException{};
using std::string;
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -62,9 +66,13 @@
int64_t lastUsed;
Consumer* exclusive;
mutable u_int64_t persistenceId;
+ std::auto_ptr<QueuePolicy> policy;
+ void pop();
+ void push(Message::shared_ptr& msg);
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
+ void setPolicy(std::auto_ptr<QueuePolicy> policy);
public:
@@ -77,7 +85,7 @@
const ConnectionToken* const owner = 0);
~Queue();
- void create();
+ void create(const qpid::framing::FieldTable& settings);
void destroy();
/**
* Informs the queue of a binding that should be cancelled on
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am Tue Dec 5 05:14:38 2006
@@ -59,6 +59,8 @@
NullMessageStore.cpp \
NullMessageStore.h \
Prefetch.h \
+ QueuePolicy.cpp \
+ QueuePolicy.h \
QueueRegistry.cpp \
QueueRegistry.h \
RecoveryManager.cpp \
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp?view=auto&rev=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp Tue Dec 5 05:14:38 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+
+using namespace qpid::broker;
+
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+
+void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
+{
+ if (checkCount(msg) || checkSize(msg)) {
+ msg->releaseContent(store);
+ }
+}
+
+void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/)
+{
+ if (maxCount) count--;
+ if (maxSize) size -= msg->contentSize();
+}
+
+bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/)
+{
+ return maxCount && ++count > maxCount;
+}
+
+bool QueuePolicy::checkSize(Message::shared_ptr& msg)
+{
+ return maxSize && (size += msg->contentSize()) > maxSize;
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h?view=auto&rev=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h Tue Dec 5 05:14:38 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuePolicy_
+#define _QueuePolicy_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class QueuePolicy
+ {
+ const u_int32_t maxCount;
+ const u_int64_t maxSize;
+ u_int32_t count;
+ u_int64_t size;
+
+ bool checkCount(Message::shared_ptr& msg);
+ bool checkSize(Message::shared_ptr& msg);
+ public:
+ QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+ void enqueued(Message::shared_ptr& msg, MessageStore* store);
+ void dequeued(Message::shared_ptr& msg, MessageStore* store);
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Tue Dec 5 05:14:38 2006
@@ -256,7 +256,7 @@
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
@@ -268,8 +268,8 @@
if (queue_created.second) { // This is a new queue
parent->getChannel(channel)->setDefaultQueue(queue);
- //create persistent record if required
- queue_created.first->create();
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
//add default binding:
parent->exchanges->getDefault()->bind(queue, name, 0);