You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/11/13 01:34:14 UTC
svn commit: r594364 [1/2] - in /incubator/qpid/trunk/qpid/cpp/src: ./
qpid/broker/ qpid/broker/management/ qpid/management/
Author: cctrieloff
Date: Mon Nov 12 16:34:09 2007
New Revision: 594364
URL: http://svn.apache.org/viewvc?rev=594364&view=rev
Log:
Patch QPID-680 from tross
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Args.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ArgsBrokerEcho.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Manageable.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Nov 12 16:34:09 2007
@@ -160,12 +160,6 @@
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
- qpid/broker/management/ManagementAgent.cpp \
- qpid/broker/management/ManagementExchange.cpp \
- qpid/broker/management/ManagementObject.cpp \
- qpid/broker/management/ManagementObjectBroker.cpp \
- qpid/broker/management/ManagementObjectQueue.cpp \
- qpid/broker/management/ManagementObjectVhost.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -193,7 +187,14 @@
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
- qpid/broker/TxPublish.cpp
+ qpid/broker/TxPublish.cpp \
+ qpid/broker/Vhost.cpp \
+ qpid/management/Broker.cpp \
+ qpid/management/ManagementAgent.cpp \
+ qpid/management/ManagementExchange.cpp \
+ qpid/management/ManagementObject.cpp \
+ qpid/management/Queue.cpp \
+ qpid/management/Vhost.cpp
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
@@ -263,12 +264,6 @@
qpid/broker/HandlerImpl.h \
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
- qpid/broker/management/ManagementAgent.h \
- qpid/broker/management/ManagementExchange.h \
- qpid/broker/management/ManagementObject.h \
- qpid/broker/management/ManagementObjectBroker.h \
- qpid/broker/management/ManagementObjectQueue.h \
- qpid/broker/management/ManagementObjectVhost.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
@@ -302,6 +297,7 @@
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
qpid/broker/TxPublish.h \
+ qpid/broker/Vhost.h \
qpid/client/AckMode.h \
qpid/client/ChainableFrameHandler.h \
qpid/client/Channel.h \
@@ -386,6 +382,15 @@
qpid/log/Options.h \
qpid/log/Selector.h \
qpid/log/Statement.h \
+ qpid/management/ArgsBrokerEcho.h \
+ qpid/management/Args.h \
+ qpid/management/Broker.h \
+ qpid/management/Manageable.h \
+ qpid/management/ManagementAgent.h \
+ qpid/management/ManagementExchange.h \
+ qpid/management/ManagementObject.h \
+ qpid/management/Queue.h \
+ qpid/management/Vhost.h \
qpid/sys/Acceptor.h \
qpid/sys/AsynchIO.h \
qpid/sys/AtomicCount.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Nov 12 16:34:09 2007
@@ -28,7 +28,8 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
-#include "management/ManagementExchange.h"
+#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ArgsBrokerEcho.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
@@ -50,6 +51,11 @@
using qpid::framing::HandlerUpdater;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::management::ArgsBrokerEcho;
namespace qpid {
namespace broker {
@@ -89,7 +95,7 @@
("store-async", optValue(storeAsync,"yes|no"),
"Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
("store-force", optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is change. Be SHURE you want to do this")
+ "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this")
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -115,33 +121,23 @@
sessionManager(conf.ack)
{
if(conf.enableMgmt){
- managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
+ managementAgent = ManagementAgent::getAgent ();
+ managementAgent->setInterval (conf.mgmtPubInterval);
- mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf));
- managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf));
+ managementAgent->addObject (mgmtObject);
- // Since there is currently no support for virtual hosts, a management object
- // representing the implied single virtual host is added here.
- mgmtVhostObject = ManagementObjectVhost::shared_ptr
- (new ManagementObjectVhost (mgmtObject->getObjectId (), conf));
- managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject));
+ // Since there is currently no support for virtual hosts, a placeholder object
+ // representing the implied single virtual host is added here to keep the
+ // management schema correct.
+ Vhost* vhost = new Vhost (this);
+ vhostObject = Vhost::shared_ptr (vhost);
- queues.setManagementAgent (managementAgent);
- queues.setManagementVhost (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject));
+ queues.setParent (vhost);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if(conf.enableMgmt) {
- QPID_LOG(info, "Management enabled");
- exchanges.declare(qpid_management, ManagementExchange::typeName);
- Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
- managementAgent->setExchange (mExchange);
- dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
- }
- else
- QPID_LOG(info, "Management not enabled");
-
if(store.get()) {
if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){
throw Exception( "Existing Journal in different mode, backup/move existing data \
@@ -158,6 +154,17 @@
declareStandardExchange(amq_fanout, FanOutExchange::typeName);
declareStandardExchange(amq_match, HeadersExchange::typeName);
+ if(conf.enableMgmt) {
+ QPID_LOG(info, "Management enabled");
+ exchanges.declare(qpid_management, ManagementExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ managementAgent->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ }
+ else
+ QPID_LOG(info, "Management not enabled");
+
// Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
for (Plugin::Plugins::const_iterator i = plugins.begin();
@@ -234,6 +241,18 @@
for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
boost::bind(&HandlerUpdater::update, _1,
channel, boost::ref(chains)));
+}
+
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/,
+ Args& /*_args*/)
+{
+ QPID_LOG (debug, "Broker::ManagementMethod");
+ return Manageable::STATUS_OK;
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Nov 12 16:34:09 2007
@@ -30,9 +30,10 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
-#include "management/ManagementAgent.h"
-#include "management/ManagementObjectBroker.h"
-#include "management/ManagementObjectVhost.h"
+#include "Vhost.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/Broker.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
#include "qpid/Url.h"
@@ -55,7 +56,7 @@
/**
* A broker instance.
*/
-class Broker : public sys::Runnable, public Plugin::Target
+class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable
{
public:
struct Options : public qpid::Options {
@@ -114,7 +115,10 @@
DtxManager& getDtxManager() { return dtxManager; }
SessionManager& getSessionManager() { return sessionManager; }
- ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; }
+
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
private:
sys::Acceptor& getAcceptor() const;
@@ -131,9 +135,9 @@
DtxManager dtxManager;
HandlerUpdaters handlerUpdaters;
SessionManager sessionManager;
- ManagementAgent::shared_ptr managementAgent;
- ManagementObjectBroker::shared_ptr mgmtObject;
- ManagementObjectVhost::shared_ptr mgmtVhostObject;
+ management::ManagementAgent::shared_ptr managementAgent;
+ management::Broker::shared_ptr mgmtObject;
+ Vhost::shared_ptr vhostObject;
static MessageStore* createStore(const Options& config);
void declareStandardExchange(const std::string& name, const std::string& type);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Mon Nov 12 16:34:09 2007
@@ -23,7 +23,7 @@
#include "FanOutExchange.h"
#include "HeadersExchange.h"
#include "TopicExchange.h"
-#include "management/ManagementExchange.h"
+#include "qpid/management/ManagementExchange.h"
#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Nov 12 16:34:09 2007
@@ -89,6 +89,13 @@
frames.map_if(f2, TypeFilter(CONTENT_BODY));
}
+void Message::encodeContent(framing::Buffer& buffer) const
+{
+ //encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter(CONTENT_BODY));
+}
+
uint32_t Message::encodedSize() const
{
return encodedHeaderSize() + encodedContentSize();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Nov 12 16:34:09 2007
@@ -91,6 +91,7 @@
uint32_t getRequiredCredit() const;
void encode(framing::Buffer& buffer) const;
+ void encodeContent(framing::Buffer& buffer) const;
/**
* @returns the size of the buffer needed to encode this
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 12 16:34:09 2007
@@ -36,10 +36,15 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
- const ConnectionToken* const _owner) :
+ const ConnectionToken* const _owner,
+ Manageable* parent) :
name(_name),
autodelete(_autodelete),
@@ -50,9 +55,21 @@
serializer(false),
dispatchCallback(*this)
{
+ if (parent != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete));
+
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject);
+ }
}
-Queue::~Queue(){}
+Queue::~Queue()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
void Queue::notifyDurableIOComplete()
{
@@ -79,7 +96,7 @@
mgmtObject->enqueue (msg->contentSize ());
}else {
if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -92,7 +109,7 @@
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -102,10 +119,10 @@
void Queue::process(Message::shared_ptr& msg){
- uint32_t mask = MSG_MASK_TX;
+ uint32_t mask = management::MSG_MASK_TX;
if (msg->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ mask |= management::MSG_MASK_PERSIST;
push(msg);
if (mgmtObject != 0)
@@ -327,7 +344,7 @@
uint32_t mask = 0;
if (msg.payload->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ mask |= management::MSG_MASK_PERSIST;
mgmtObject->dequeue (msg.payload->contentSize (), mask);
}
@@ -570,4 +587,15 @@
}
if (sync) sync->completed();
+}
+
+ManagementObject::shared_ptr Queue::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+ Args& /*args*/)
+{
+ return Manageable::STATUS_OK;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Nov 12 16:34:09 2007
@@ -35,7 +35,8 @@
#include "PersistableQueue.h"
#include "QueuePolicy.h"
#include "QueueBindings.h"
-#include "management/ManagementObjectQueue.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Queue.h"
namespace qpid {
namespace broker {
@@ -59,7 +60,7 @@
* registered consumers or be stored until dequeued or until one
* or more consumers registers.
*/
- class Queue : public PersistableQueue {
+ class Queue : public PersistableQueue, public management::Manageable {
typedef std::vector<Consumer::ptr> Consumers;
typedef std::deque<QueuedMessage> Messages;
@@ -94,7 +95,7 @@
qpid::sys::Serializer<DispatchFunctor> serializer;
DispatchFunctor dispatchCallback;
framing::SequenceNumber sequence;
- ManagementObjectQueue::shared_ptr mgmtObject;
+ management::Queue::shared_ptr mgmtObject;
void pop();
void push(Message::shared_ptr& msg);
@@ -122,7 +123,8 @@
Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
- const ConnectionToken* const owner = 0);
+ const ConnectionToken* const owner = 0,
+ Manageable* parent = 0);
~Queue();
void create(const qpid::framing::FieldTable& settings);
@@ -130,8 +132,6 @@
void destroy();
void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
- void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; }
- ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; }
bool acquire(const QueuedMessage& msg);
@@ -203,6 +203,11 @@
static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Nov 12 16:34:09 2007
@@ -19,8 +19,6 @@
*
*/
#include "QueueRegistry.h"
-#include "management/ManagementAgent.h"
-#include "management/ManagementObjectQueue.h"
#include "qpid/log/Statement.h"
#include <sstream>
#include <assert.h>
@@ -29,7 +27,7 @@
using namespace qpid::sys;
QueueRegistry::QueueRegistry(MessageStore* const _store) :
- counter(1), store(_store) {}
+ counter(1), store(_store), parent(0) {}
QueueRegistry::~QueueRegistry(){}
@@ -43,17 +41,9 @@
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
queues[name] = queue;
- if (managementAgent){
- ManagementObjectQueue::shared_ptr mgmtObject
- (new ManagementObjectQueue (managementVhost->getObjectId (), name, durable, autoDelete));
-
- queue->setMgmt (mgmtObject);
- managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
- }
-
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
return std::pair<Queue::shared_ptr, bool>(i->second, false);
@@ -61,16 +51,6 @@
}
void QueueRegistry::destroyLH (const string& name){
- if (managementAgent){
- ManagementObjectQueue::shared_ptr mgmtObject;
- QueueMap::iterator i = queues.find(name);
-
- if (i != queues.end()){
- mgmtObject = i->second->getMgmt ();
- mgmtObject->resourceDestroy ();
- }
- }
-
queues.erase(name);
}
@@ -104,19 +84,4 @@
MessageStore* const QueueRegistry::getStore() const {
return store;
-}
-
-void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent)
-{
- managementAgent = agent;
-}
-
-ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void)
-{
- return managementAgent;
-}
-
-void QueueRegistry::setManagementVhost (ManagementObject::shared_ptr vhost)
-{
- managementVhost = vhost;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Nov 12 16:34:09 2007
@@ -24,7 +24,7 @@
#include <map>
#include "qpid/sys/Mutex.h"
#include "Queue.h"
-#include "management/ManagementAgent.h"
+#include "qpid/management/Manageable.h"
namespace qpid {
namespace broker {
@@ -89,22 +89,19 @@
* Return the message store used.
*/
MessageStore* const getStore() const;
-
+
/**
- * Set/Get the ManagementAgent in use.
+ * Register the manageable parent for declared queues
*/
- void setManagementAgent (ManagementAgent::shared_ptr agent);
- ManagementAgent::shared_ptr getManagementAgent (void);
- void setManagementVhost (ManagementObject::shared_ptr vhost);
-
+ void setParent (management::Manageable* _parent) { parent = _parent; }
+
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
qpid::sys::RWlock lock;
int counter;
MessageStore* const store;
- ManagementAgent::shared_ptr managementAgent;
- ManagementObject::shared_ptr managementVhost;
+ management::Manageable* parent;
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Mon Nov 12 16:34:09 2007
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "Vhost.h"
+#include "qpid/management/ManagementAgent.h"
+
+using namespace qpid::broker;
+using qpid::management::ManagementAgent;
+
+Vhost::Vhost (management::Manageable* parentBroker)
+{
+ if (parentBroker != 0)
+ {
+ mgmtObject = management::Vhost::shared_ptr
+ (new management::Vhost (this, parentBroker));
+
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject);
+ }
+}
+
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h Mon Nov 12 16:34:09 2007
@@ -0,0 +1,51 @@
+#ifndef _Vhost_
+#define _Vhost_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Vhost.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Vhost : public management::Manageable
+{
+ private:
+
+ management::Vhost::shared_ptr mgmtObject;
+
+ public:
+
+ typedef boost::shared_ptr<Vhost> shared_ptr;
+
+ Vhost (management::Manageable* parentBroker);
+
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
+ { return mgmtObject; }
+
+ management::Manageable::status_t ManagementMethod (uint32_t, management::Args&)
+ { return management::Manageable::STATUS_OK; }
+};
+
+}}
+
+#endif /*!_Vhost_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.cpp Mon Nov 12 16:34:09 2007
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementAgent.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
-#include <qpid/broker/MessageDelivery.h>
-#include <qpid/framing/AMQFrame.h>
-#include <list>
-
-using namespace qpid::framing;
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval), nextObjectId(1)
-{
- timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
-}
-
-void ManagementAgent::setExchange (Exchange::shared_ptr _exchange)
-{
- exchange = _exchange;
-}
-
-void ManagementAgent::addObject (ManagementObject::shared_ptr object)
-{
- object->setObjectId (nextObjectId++);
- managementObjects.push_back (object);
- QPID_LOG(info, "Management Object Added");
-}
-
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
-
-void ManagementAgent::Periodic::fire ()
-{
- agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval)));
- agent.PeriodicProcessing ();
-}
-
-void ManagementAgent::clientAdded (void)
-{
- for (ManagementObjectVector::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = *iter;
- object->setAllChanged ();
- object->setSchemaNeeded ();
- }
-}
-
-void ManagementAgent::PeriodicProcessing (void)
-{
-#define BUFSIZE 65536
-#define THRESHOLD 16384
- char msgChars[BUFSIZE];
- Buffer msgBuffer (msgChars, BUFSIZE);
- uint32_t contentSize;
- std::list<uint32_t> deleteList;
-
- if (managementObjects.empty ())
- return;
-
- Message::shared_ptr msg (new Message ());
-
- // Build the magic number for the management message.
- msgBuffer.putOctet ('A');
- msgBuffer.putOctet ('M');
- msgBuffer.putOctet ('0');
- msgBuffer.putOctet ('1');
-
- for (uint32_t idx = 0; idx < managementObjects.size (); idx++)
- {
- ManagementObject::shared_ptr object = managementObjects[idx];
-
- if (object->getSchemaNeeded ())
- {
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('S'); // opcode = Schema Record
- msgBuffer.putOctet (0); // content-class = N/A
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- object->writeSchema (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- if (object->getConfigChanged ())
- {
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('C'); // content-class = Configuration
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- object->writeConfig (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- if (object->getInstChanged ())
- {
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('I'); // content-class = Instrumentation
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- object->writeInstrumentation (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- if (object->isDeleted ())
- deleteList.push_back (idx);
-
- // Temporary protection against buffer overrun.
- // This needs to be replaced with frame fragmentation.
- if (msgBuffer.available () < THRESHOLD)
- break;
- }
-
- msgBuffer.putOctet ('X'); // End-of-message
- msgBuffer.putOctet (0);
- msgBuffer.putShort (0);
- msgBuffer.putLong (8);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
-
- AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
- 0, "qpid.management", 0, 0));
- AMQFrame header (0, AMQHeaderBody());
- AMQFrame content;
-
- content.setBody(AMQContentBody());
- content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(contentSize);
- msg->getFrames().append(content);
-
- DeliverableMessage deliverable (msg);
- exchange->route (deliverable, "mgmt", 0);
-
- // Delete flagged objects
- for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
- iter++)
- {
- managementObjects.erase (managementObjects.begin () + *iter);
- }
- deleteList.clear ();
-}
-
-void ManagementAgent::dispatchCommand (Deliverable& /*msg*/,
- const string& routingKey,
- const FieldTable* /*args*/)
-{
- size_t pos, start;
-
- if (routingKey.compare (0, 7, "method.") != 0)
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-
- start = 7;
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing class-key in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-key in routing key: " << routingKey);
- return;
- }
-
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
-
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- QPID_LOG (debug, "Dispatch class: " << className << ", method: " << methodName);
-}
-
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementAgent.h Mon Nov 12 16:34:09 2007
@@ -1,74 +0,0 @@
-#ifndef _ManagementAgent_
-#define _ManagementAgent_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/Options.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
-#include "ManagementObject.h"
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-
-
-class ManagementAgent
-{
- public:
-
- typedef boost::shared_ptr<ManagementAgent> shared_ptr;
-
- ManagementAgent (uint16_t interval);
-
- void setExchange (Exchange::shared_ptr exchange);
- void addObject (ManagementObject::shared_ptr object);
- void clientAdded (void);
- void dispatchCommand (Deliverable& msg,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
- private:
-
- struct Periodic : public TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- ~Periodic () {}
- void fire ();
- };
-
- ManagementObjectVector managementObjects;
- Timer timer;
- Exchange::shared_ptr exchange;
- uint16_t interval;
- uint32_t nextObjectId;
-
- void PeriodicProcessing (void);
-};
-
-}}
-
-
-
-#endif /*!_ManagementAgent_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.cpp Mon Nov 12 16:34:09 2007
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementExchange.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-ManagementExchange::ManagementExchange (const string& _name) :
- Exchange (_name), TopicExchange(_name) {}
-ManagementExchange::ManagementExchange (const std::string& _name,
- bool _durable,
- const FieldTable& _args) :
- Exchange (_name, _durable, _args),
- TopicExchange(_name, _durable, _args) {}
-
-
-bool ManagementExchange::bind (Queue::shared_ptr queue,
- const string& routingKey,
- const FieldTable* args)
-{
- bool result = TopicExchange::bind (queue, routingKey, args);
-
- // Notify the management agent that a new management client has bound to the
- // exchange.
- if (result)
- managementAgent->clientAdded ();
-
- return result;
-}
-
-void ManagementExchange::route (Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
-{
- // Intercept management commands
- if (routingKey.length () > 7 &&
- routingKey.substr (0, 7).compare ("method.") == 0)
- {
- managementAgent->dispatchCommand (msg, routingKey, args);
- return;
- }
-
- TopicExchange::route (msg, routingKey, args);
-}
-
-void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
-{
- managementAgent = agent;
-}
-
-
-ManagementExchange::~ManagementExchange() {}
-
-const std::string ManagementExchange::typeName("management");
-
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementExchange.h Mon Nov 12 16:34:09 2007
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ManagementExchange_
-#define _ManagementExchange_
-
-#include "qpid/broker/TopicExchange.h"
-#include "ManagementAgent.h"
-
-namespace qpid {
-namespace broker {
-
-class ManagementExchange : public virtual TopicExchange
-{
- private:
- ManagementAgent::shared_ptr managementAgent;
-
- public:
- static const std::string typeName;
-
- ManagementExchange (const string& name);
- ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
-
- virtual std::string getType() const { return typeName; }
-
- virtual bool bind (Queue::shared_ptr queue,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
- virtual void route (Deliverable& msg,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
- void setManagmentAgent (ManagementAgent::shared_ptr agent);
-
- virtual ~ManagementExchange();
-};
-
-
-}
-}
-
-#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.cpp Mon Nov 12 16:34:09 2007
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementObject.h"
-
-using namespace qpid::framing;
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-void ManagementObject::schemaItem (Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig,
- bool isIndex)
-{
- uint8_t flags =
- (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
-
- buf.putOctet (flags);
- buf.putOctet (typeCode);
- buf.putShortString (name);
- buf.putShortString (description);
-}
-
-void ManagementObject::schemaListBegin (Buffer& buf)
-{
- schemaItem (buf, TYPE_UINT32, "id", "Object ID", true, true);
-}
-
-void ManagementObject::schemaListEnd (Buffer& buf)
-{
- buf.putOctet (FLAG_END);
-}
-
-void ManagementObject::writeTimestamps (Buffer& buf)
-{
- buf.putLongLong (uint64_t (Duration (now ())));
- buf.putLongLong (createTime);
- buf.putLongLong (destroyTime);
- buf.putLong (objectId);
-}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObject.h Mon Nov 12 16:34:09 2007
@@ -1,118 +0,0 @@
-#ifndef _ManagementObject_
-#define _ManagementObject_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-#include <qpid/framing/Buffer.h>
-#include <boost/shared_ptr.hpp>
-#include <vector>
-
-namespace qpid {
-namespace broker {
-
-const uint16_t OBJECT_SYSTEM = 1;
-const uint16_t OBJECT_BROKER = 2;
-const uint16_t OBJECT_VHOST = 3;
-const uint16_t OBJECT_QUEUE = 4;
-const uint16_t OBJECT_EXCHANGE = 5;
-const uint16_t OBJECT_BINDING = 6;
-const uint16_t OBJECT_CLIENT = 7;
-const uint16_t OBJECT_SESSION = 8;
-const uint16_t OBJECT_DESTINATION = 9;
-const uint16_t OBJECT_PRODUCER = 10;
-const uint16_t OBJECT_CONSUMER = 11;
-
-
-class ManagementObject
-{
- protected:
-
- uint64_t createTime;
- uint64_t destroyTime;
- uint32_t objectId;
- bool configChanged;
- bool instChanged;
- bool deleted;
-
- static const uint8_t TYPE_UINT8 = 1;
- static const uint8_t TYPE_UINT16 = 2;
- static const uint8_t TYPE_UINT32 = 3;
- static const uint8_t TYPE_UINT64 = 4;
- static const uint8_t TYPE_BOOL = 5;
- static const uint8_t TYPE_STRING = 6;
-
- static const uint8_t FLAG_CONFIG = 0x01;
- static const uint8_t FLAG_INDEX = 0x02;
- static const uint8_t FLAG_END = 0x80;
-
- void schemaItem (qpid::framing::Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig = false,
- bool isIndex = false);
- void schemaListBegin (qpid::framing::Buffer& buf);
- void schemaListEnd (qpid::framing::Buffer& buf);
- void writeTimestamps (qpid::framing::Buffer& buf);
-
- public:
- typedef boost::shared_ptr<ManagementObject> shared_ptr;
-
- ManagementObject () : destroyTime(0), objectId (), configChanged(true),
- instChanged(true), deleted(false)
- { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
- virtual ~ManagementObject () {}
-
- virtual uint16_t getObjectType (void) = 0;
- virtual std::string getObjectName (void) = 0;
- virtual void writeSchema (qpid::framing::Buffer& buf) = 0;
- virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
- virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
- virtual bool getSchemaNeeded (void) = 0;
- virtual void setSchemaNeeded (void) = 0;
-
- void setObjectId (uint32_t oid) { objectId = oid; }
- uint32_t getObjectId (void) { return objectId; }
- inline bool getConfigChanged (void) { return configChanged; }
- virtual bool getInstChanged (void) { return instChanged; }
- inline void setAllChanged (void)
- {
- configChanged = true;
- instChanged = true;
- }
-
- inline void resourceDestroy (void) {
- destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
- deleted = true;
- }
- bool isDeleted (void) { return deleted; }
-
-};
-
- typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
-
-}}
-
-
-
-#endif /*!_ManagementObject_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp Mon Nov 12 16:34:09 2007
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "config.h"
-#include "qpid/broker/Broker.h"
-#include "ManagementObjectBroker.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-bool ManagementObjectBroker::schemaNeeded = true;
-
-ManagementObjectBroker::ManagementObjectBroker (const Options& _conf)
-{
- Broker::Options& conf = (Broker::Options&) _conf;
-
- sysId = "sysId";
- port = conf.port;
- workerThreads = conf.workerThreads;
- maxConns = conf.maxConnections;
- connBacklog = conf.connectionBacklog;
- stagingThreshold = conf.stagingThreshold;
- storeLib = conf.store;
- asyncStore = conf.storeAsync;
- mgmtPubInterval = conf.mgmtPubInterval;
- initialDiskPageSize = 0;
- initialPagesPerQueue = 0;
- clusterName = "";
- version = PACKAGE_VERSION;
-}
-
-ManagementObjectBroker::~ManagementObjectBroker () {}
-
-void ManagementObjectBroker::writeSchema (Buffer& buf)
-{
- schemaNeeded = false;
-
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true);
- schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true);
- schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
- schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true);
- schemaItem (buf, TYPE_UINT16, "connBacklog",
- "Connection backlog limit for listening socket", true);
- schemaItem (buf, TYPE_UINT32, "stagingThreshold",
- "Broker stages messages over this size to disk", true);
- schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true);
- schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true);
- schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
- schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
- "Number of disk pages allocated for storage", true);
- schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
- "Number of disk pages allocated per queue", true);
- schemaItem (buf, TYPE_STRING, "clusterName",
- "Name of cluster this server is a member of, zero-length for standalone server", true);
- schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
- schemaListEnd (buf);
-}
-
-void ManagementObjectBroker::writeConfig (Buffer& buf)
-{
- configChanged = false;
-
- writeTimestamps (buf);
- buf.putLong (0);
- buf.putShort (port);
- buf.putShort (workerThreads);
- buf.putShort (maxConns);
- buf.putShort (connBacklog);
- buf.putLong (stagingThreshold);
- buf.putShortString (storeLib);
- buf.putOctet (asyncStore ? 1 : 0);
- buf.putShort (mgmtPubInterval);
- buf.putLong (initialDiskPageSize);
- buf.putLong (initialPagesPerQueue);
- buf.putShortString (clusterName);
- buf.putShortString (version);
-}
-
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectBroker.h Mon Nov 12 16:34:09 2007
@@ -1,73 +0,0 @@
-#ifndef _ManagementObjectBroker_
-#define _ManagementObjectBroker_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementObject.h"
-#include "qpid/Options.h"
-#include "boost/shared_ptr.hpp"
-
-namespace qpid {
-namespace broker {
-
-class ManagementObjectBroker : public ManagementObject
-{
- public:
-
- typedef boost::shared_ptr<ManagementObjectBroker> shared_ptr;
-
- ManagementObjectBroker (const Options& conf);
- ~ManagementObjectBroker (void);
-
- private:
-
- static bool schemaNeeded;
-
- std::string sysId;
- uint16_t port;
- uint16_t workerThreads;
- uint16_t maxConns;
- uint16_t connBacklog;
- uint32_t stagingThreshold;
- std::string storeLib;
- bool asyncStore;
- uint16_t mgmtPubInterval;
- uint32_t initialDiskPageSize;
- uint32_t initialPagesPerQueue;
- std::string clusterName;
- std::string version;
-
- uint16_t getObjectType (void) { return OBJECT_BROKER; }
- std::string getObjectName (void) { return "broker"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
-
- inline bool getInstChanged (void) { return false; }
-};
-
-}}
-
-
-#endif /*!_ManagementObjectBroker_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp Mon Nov 12 16:34:09 2007
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementObjectQueue.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-bool ManagementObjectQueue::schemaNeeded = true;
-
-ManagementObjectQueue::ManagementObjectQueue (uint32_t _vhostRef, std::string& _name,
- bool _durable, bool _autoDelete) :
- vhostRef(_vhostRef), name(_name), durable(_durable), autoDelete(_autoDelete)
-{
- msgTotalEnqueues = 0;
- msgTotalDequeues = 0;
- msgTxEnqueues = 0;
- msgTxDequeues = 0;
- msgPersistEnqueues = 0;
- msgPersistDequeues = 0;
-
- msgDepth = 0;
- msgDepthLow = 0;
- msgDepthHigh = 0;
-
- byteTotalEnqueues = 0;
- byteTotalDequeues = 0;
- byteTxEnqueues = 0;
- byteTxDequeues = 0;
- bytePersistEnqueues = 0;
- bytePersistDequeues = 0;
-
- byteDepth = 0;
- byteDepthLow = 0;
- byteDepthHigh = 0;
-
- enqueueTxStarts = 0;
- enqueueTxCommits = 0;
- enqueueTxRejects = 0;
- dequeueTxStarts = 0;
- dequeueTxCommits = 0;
- dequeueTxRejects = 0;
-
- enqueueTxCount = 0;
- enqueueTxCountLow = 0;
- enqueueTxCountHigh = 0;
-
- dequeueTxCount = 0;
- dequeueTxCountLow = 0;
- dequeueTxCountHigh = 0;
-
- consumers = 0;
- consumersLow = 0;
- consumersHigh = 0;
-}
-
-ManagementObjectQueue::~ManagementObjectQueue () {}
-
-void ManagementObjectQueue::writeSchema (Buffer& buf)
-{
- schemaNeeded = false;
-
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT32, "vhostRef", "Virtual Host Ref", true);
- schemaItem (buf, TYPE_STRING, "name", "Queue Name", true);
- schemaItem (buf, TYPE_BOOL, "durable", "Durable", true);
- schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true);
- schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued");
- schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued");
- schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued");
- schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages");
- schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval");
- schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval");
- schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued");
- schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued");
- schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued");
- schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued");
- schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
- schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
- schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes");
- schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval");
- schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started ");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started ");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval");
- schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue");
- schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval");
- schemaListEnd (buf);
-}
-
-void ManagementObjectQueue::writeConfig (Buffer& buf)
-{
- configChanged = false;
-
- writeTimestamps (buf);
- buf.putLong (vhostRef);
- buf.putShortString (name);
- buf.putOctet (durable ? 1 : 0);
- buf.putOctet (autoDelete ? 1 : 0);
-}
-
-void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
-{
- instChanged = false;
-
- writeTimestamps (buf);
- buf.putLongLong (msgTotalEnqueues);
- buf.putLongLong (msgTotalDequeues);
- buf.putLongLong (msgTxEnqueues);
- buf.putLongLong (msgTxDequeues);
- buf.putLongLong (msgPersistEnqueues);
- buf.putLongLong (msgPersistDequeues);
- buf.putLong (msgDepth);
- buf.putLong (msgDepthLow);
- buf.putLong (msgDepthHigh);
- buf.putLongLong (byteTotalEnqueues);
- buf.putLongLong (byteTotalDequeues);
- buf.putLongLong (byteTxEnqueues);
- buf.putLongLong (byteTxDequeues);
- buf.putLongLong (bytePersistEnqueues);
- buf.putLongLong (bytePersistDequeues);
- buf.putLong (byteDepth);
- buf.putLong (byteDepthLow);
- buf.putLong (byteDepthHigh);
- buf.putLongLong (enqueueTxStarts);
- buf.putLongLong (enqueueTxCommits);
- buf.putLongLong (enqueueTxRejects);
- buf.putLong (enqueueTxCount);
- buf.putLong (enqueueTxCountLow);
- buf.putLong (enqueueTxCountHigh);
- buf.putLongLong (dequeueTxStarts);
- buf.putLongLong (dequeueTxCommits);
- buf.putLongLong (dequeueTxRejects);
- buf.putLong (dequeueTxCount);
- buf.putLong (dequeueTxCountLow);
- buf.putLong (dequeueTxCountHigh);
- buf.putLong (consumers);
- buf.putLong (consumersLow);
- buf.putLong (consumersHigh);
-
- msgDepthLow = msgDepth;
- msgDepthHigh = msgDepth;
- byteDepthLow = byteDepth;
- byteDepthHigh = byteDepth;
- enqueueTxCountLow = enqueueTxCount;
- enqueueTxCountHigh = enqueueTxCount;
- dequeueTxCountLow = dequeueTxCount;
- dequeueTxCountHigh = dequeueTxCount;
- consumersLow = consumers;
- consumersHigh = consumers;
-}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectQueue.h Mon Nov 12 16:34:09 2007
@@ -1,181 +0,0 @@
-#ifndef _ManagementObjectQueue_
-#define _ManagementObjectQueue_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementObject.h"
-
-namespace qpid {
-namespace broker {
-
-const uint32_t MSG_MASK_TX = 1; // Transactional message
-const uint32_t MSG_MASK_PERSIST = 2; // Persistent message
-
-class ManagementObjectQueue : public ManagementObject
-{
- private:
-
- static bool schemaNeeded;
-
- uint32_t vhostRef;
- std::string name;
- bool durable;
- bool autoDelete;
-
- uint64_t msgTotalEnqueues; // Total messages enqueued
- uint64_t msgTotalDequeues; // Total messages dequeued
- uint64_t msgTxEnqueues; // Transactional messages enqueued
- uint64_t msgTxDequeues; // Transactional messages dequeued
- uint64_t msgPersistEnqueues; // Persistent messages enqueued
- uint64_t msgPersistDequeues; // Persistent messages dequeued
-
- uint32_t msgDepth; // Current size of queue in messages
- uint32_t msgDepthLow; // Low-water queue size, this interval
- uint32_t msgDepthHigh; // High-water queue size, this interval
-
- uint64_t byteTotalEnqueues; // Total messages enqueued
- uint64_t byteTotalDequeues; // Total messages dequeued
- uint64_t byteTxEnqueues; // Transactional messages enqueued
- uint64_t byteTxDequeues; // Transactional messages dequeued
- uint64_t bytePersistEnqueues; // Persistent messages enqueued
- uint64_t bytePersistDequeues; // Persistent messages dequeued
-
- uint32_t byteDepth; // Current size of queue in bytes
- uint32_t byteDepthLow; // Low-water mark this interval
- uint32_t byteDepthHigh; // High-water mark this interval
-
- uint64_t enqueueTxStarts; // Total enqueue transactions started
- uint64_t enqueueTxCommits; // Total enqueue transactions committed
- uint64_t enqueueTxRejects; // Total enqueue transactions rejected
-
- uint32_t enqueueTxCount; // Current pending enqueue transactions
- uint32_t enqueueTxCountLow; // Low water mark this interval
- uint32_t enqueueTxCountHigh; // High water mark this interval
-
- uint64_t dequeueTxStarts; // Total dequeue transactions started
- uint64_t dequeueTxCommits; // Total dequeue transactions committed
- uint64_t dequeueTxRejects; // Total dequeue transactions rejected
-
- uint32_t dequeueTxCount; // Current pending dequeue transactions
- uint32_t dequeueTxCountLow; // Low water mark this interval
- uint32_t dequeueTxCountHigh; // High water mark this interval
-
- uint32_t consumers; // Current consumers on queue
- uint32_t consumersLow; // Low water mark this interval
- uint32_t consumersHigh; // High water mark this interval
-
- uint16_t getObjectType (void) { return OBJECT_QUEUE; }
- std::string getObjectName (void) { return "queue"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& buf);
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
-
- inline void adjustQueueHiLo (void){
- if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
- if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
-
- if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
- if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
- instChanged = true;
- }
-
- inline void adjustTxHiLo (void){
- if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
- if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
- if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
- if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
- instChanged = true;
- }
-
- inline void adjustConsumerHiLo (void){
- if (consumers > consumersHigh) consumersHigh = consumers;
- if (consumers < consumersLow) consumersLow = consumers;
- instChanged = true;
- }
-
- public:
-
- typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr;
-
- ManagementObjectQueue (uint32_t _vhostRef, std::string& name,
- bool durable, bool autoDelete);
- ~ManagementObjectQueue (void);
-
- // The following mask contents are used to describe enqueued or dequeued
- // messages when counting statistics.
-
- inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
- msgTotalEnqueues++;
- byteTotalEnqueues += bytes;
-
- if (attrMask & MSG_MASK_TX){
- msgTxEnqueues++;
- byteTxEnqueues += bytes;
- }
-
- if (attrMask & MSG_MASK_PERSIST){
- msgPersistEnqueues++;
- bytePersistEnqueues += bytes;
- }
-
- msgDepth++;
- byteDepth += bytes;
- adjustQueueHiLo ();
- }
-
- inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
- msgTotalDequeues++;
- byteTotalDequeues += bytes;
-
- if (attrMask & MSG_MASK_TX){
- msgTxDequeues++;
- byteTxDequeues += bytes;
- }
-
- if (attrMask & MSG_MASK_PERSIST){
- msgPersistDequeues++;
- bytePersistDequeues += bytes;
- }
-
- msgDepth--;
- byteDepth -= bytes;
- adjustQueueHiLo ();
- }
-
- inline void incConsumers (void){
- consumers++;
- adjustConsumerHiLo ();
- }
-
- inline void decConsumers (void){
- consumers--;
- adjustConsumerHiLo ();
- }
-};
-
-}}
-
-
-
-#endif /*!_ManagementObjectQueue_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp Mon Nov 12 16:34:09 2007
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Broker.h"
-#include "ManagementObjectVhost.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-bool ManagementObjectVhost::schemaNeeded = true;
-
-ManagementObjectVhost::ManagementObjectVhost (uint32_t _sysRef, const Options& /*_conf*/) :
- sysRef(_sysRef), name("/") {}
-
-ManagementObjectVhost::~ManagementObjectVhost () {}
-
-void ManagementObjectVhost::writeSchema (Buffer& buf)
-{
- schemaNeeded = false;
-
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT32, "brokerRef", "Broker Reference" , true);
- schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true);
- schemaListEnd (buf);
-}
-
-void ManagementObjectVhost::writeConfig (Buffer& buf)
-{
- configChanged = false;
-
- writeTimestamps (buf);
- buf.putLong (sysRef);
- buf.putShortString (name);
-}
-
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h?rev=594364&r1=594363&r2=594364&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/management/ManagementObjectVhost.h Mon Nov 12 16:34:09 2007
@@ -1,62 +0,0 @@
-#ifndef _ManagementObjectVhost_
-#define _ManagementObjectVhost_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementObject.h"
-#include "qpid/Options.h"
-#include "boost/shared_ptr.hpp"
-
-namespace qpid {
-namespace broker {
-
-class ManagementObjectVhost : public ManagementObject
-{
- public:
-
- typedef boost::shared_ptr<ManagementObjectVhost> shared_ptr;
-
- ManagementObjectVhost (uint32_t sysRef, const Options& conf);
- ~ManagementObjectVhost (void);
-
- private:
-
- static bool schemaNeeded;
-
- uint32_t sysRef;
- std::string name;
-
- uint16_t getObjectType (void) { return OBJECT_VHOST; }
- std::string getObjectName (void) { return "vhost"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
-
- inline bool getInstChanged (void) { return false; }
-};
-
-}}
-
-
-#endif /*!_ManagementObjectVhost_*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Args.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Args.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Args.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Args.h Mon Nov 12 16:34:09 2007
@@ -0,0 +1,39 @@
+#ifndef _Args_
+#define _Args_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+
+namespace qpid {
+namespace management {
+
+class Args
+{
+ public:
+
+ virtual ~Args (void) = 0;
+
+};
+
+inline Args::~Args (void) {}
+
+}}
+
+#endif /*!_Args_*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ArgsBrokerEcho.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ArgsBrokerEcho.h?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ArgsBrokerEcho.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ArgsBrokerEcho.h Mon Nov 12 16:34:09 2007
@@ -0,0 +1,38 @@
+#ifndef _ArgsBrokerEcho_
+#define _ArgsBrokerEcho_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "Args.h"
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ArgsBrokerEcho : public Args
+{
+ public:
+ uint32_t io_sequence;
+ std::string io_body;
+};
+
+}}
+
+#endif /*!_ArgsBrokerEcho_*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp?rev=594364&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp Mon Nov 12 16:34:09 2007
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+#include "qpid/broker/Broker.h"
+#include "Broker.h"
+#include "ArgsBrokerEcho.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Broker::schemaNeeded = true;
+
+Broker::Broker (Manageable* _core, const Options& _conf) :
+ ManagementObject (_core)
+{
+ broker::Broker::Options& conf = (broker::Broker::Options&) _conf;
+
+ sysId = "sysId";
+ port = conf.port;
+ workerThreads = conf.workerThreads;
+ maxConns = conf.maxConnections;
+ connBacklog = conf.connectionBacklog;
+ stagingThreshold = conf.stagingThreshold;
+ storeLib = conf.store;
+ asyncStore = conf.storeAsync;
+ mgmtPubInterval = conf.mgmtPubInterval;
+ initialDiskPageSize = 0;
+ initialPagesPerQueue = 0;
+ clusterName = "";
+ version = PACKAGE_VERSION;
+}
+
+Broker::~Broker () {}
+
+void Broker::writeSchema (Buffer& buf)
+{
+ schemaNeeded = false;
+
+ schemaListBegin (buf);
+ schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true);
+ schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true);
+ schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
+ schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true);
+ schemaItem (buf, TYPE_UINT16, "connBacklog",
+ "Connection backlog limit for listening socket", true);
+ schemaItem (buf, TYPE_UINT32, "stagingThreshold",
+ "Broker stages messages over this size to disk", true);
+ schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true);
+ schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true);
+ schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
+ schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
+ "Number of disk pages allocated for storage", true);
+ schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
+ "Number of disk pages allocated per queue", true);
+ schemaItem (buf, TYPE_STRING, "clusterName",
+ "Name of cluster this server is a member of, zero-length for standalone server", true);
+ schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
+ schemaListEnd (buf);
+}
+
+void Broker::writeConfig (Buffer& buf)
+{
+ configChanged = false;
+
+ writeTimestamps (buf);
+ buf.putLong (0);
+ buf.putShort (port);
+ buf.putShort (workerThreads);
+ buf.putShort (maxConns);
+ buf.putShort (connBacklog);
+ buf.putLong (stagingThreshold);
+ buf.putShortString (storeLib);
+ buf.putOctet (asyncStore ? 1 : 0);
+ buf.putShort (mgmtPubInterval);
+ buf.putLong (initialDiskPageSize);
+ buf.putLong (initialPagesPerQueue);
+ buf.putShortString (clusterName);
+ buf.putShortString (version);
+}
+
+void Broker::doMethod (string methodName,
+ Buffer& inBuf,
+ Buffer& outBuf)
+{
+ if (methodName.compare ("echo") == 0)
+ {
+ ArgsBrokerEcho args;
+ uint32_t result;
+
+ args.io_sequence = inBuf.getLong ();
+ inBuf.getLongString (args.io_body);
+
+ result = coreObject->ManagementMethod (1, args);
+
+ outBuf.putLong (result);
+ outBuf.putShortString ("OK");
+ outBuf.putLong (args.io_sequence);
+ outBuf.putLongString (args.io_body);
+ }
+ else
+ {
+ outBuf.putLong (1);
+ outBuf.putShortString ("Unknown Method");
+ }
+}
+