You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/12/06 19:37:20 UTC
svn commit: r601807 - in /incubator/qpid/trunk/qpid/cpp: managementgen/
managementgen/templates/ src/qpid/broker/ src/qpid/management/
Author: aconway
Date: Thu Dec 6 10:37:18 2007
New Revision: 601807
URL: http://svn.apache.org/viewvc?rev=601807&view=rev
Log:
>From Ted Ross <tr...@redhat.com>
Queue statistics fixed. Additional objects added (exchange, binding).
Changes:
M cpp/src/qpid/broker/ExchangeRegistry.h
M cpp/src/qpid/broker/ExchangeRegistry.cpp
ExchangeRegistry was modified to pass a parent pointer to created
exchanges. This parent reference is not stored but is used to
link management objects in a hierarchy of ownership.
M cpp/src/qpid/broker/Exchange.h
M cpp/src/qpid/broker/Exchange.cpp
Exchange now inherits Manageable to make it visible via the
management interface. The Exchange parent class handles most of
the management boilerplate. A Binding struct was introduced to
track bindings for management. This is separate from
QueueBindings which track bindings for queues.
M cpp/src/qpid/broker/HeadersExchange.h
M cpp/src/qpid/broker/FanOutExchange.h
M cpp/src/qpid/broker/DirectExchange.h
M cpp/src/qpid/broker/TopicExchange.h
M cpp/src/qpid/broker/HeadersExchange.cpp
M cpp/src/qpid/broker/FanOutExchange.cpp
M cpp/src/qpid/broker/DirectExchange.cpp
M cpp/src/qpid/broker/TopicExchange.cpp
M cpp/src/qpid/management/ManagementExchange.cpp
M cpp/src/qpid/management/ManagementExchange.h
Each exchange type handles management stats in its own specific
way. Additionally, the constructors pass the management parent
pointer to the constructor or Exchange.
An extra layer was added to contain bindings. Instead of directly
storing bound queues, the exchanges store "bindings" which are
managable constructs.
M cpp/src/qpid/broker/Broker.cpp
Broker now explicitly enables the management agent. Also sets the
management parent (vhost) in the exchange registry.
M cpp/src/qpid/broker/Vhost.cpp
Updated constructor to be more defensive in case the management
agent has not been enabled.
M cpp/src/qpid/broker/Queue.cpp
Same constructor update as vhost. Moved accounting of dequeues
into "pop". Implemented management method handler (purge).
M cpp/src/qpid/broker/Deliverable.h
A new method was added to extract the content size of the
deliverable content (if appropriate). The method is not pure
virtual and returns zero if not overridden.
M cpp/src/qpid/broker/DeliverableMessage.h
M cpp/src/qpid/broker/TxPublish.cpp
M cpp/src/qpid/broker/DeliverableMessage.cpp
M cpp/src/qpid/broker/TxPublish.h
These derivatives of Deliverable were updated with overrides for
contenSize.
M cpp/src/qpid/management/ManagementAgent.h
M cpp/src/qpid/management/ManagementAgent.cpp
An "enable" method was added to prevent inadvertent creation of a
management agent when not desired.
Adding and deleting management objects is now protected by a
mutex.
Make sure that deleted objects get reported even if neither their
configuration nor instrumentation is changed.
M specs/management-schema.xml
Minor cosmetic updates. Additional parent linkage.
M cpp/managementgen/schema.py
M cpp/managementgen/templates/Class.cpp
Added generated code to publish schema details for methods.
Modified:
incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Thu Dec 6 10:37:18 2007
@@ -397,6 +397,24 @@
def getDir (self):
return self.dir
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
+ stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n")
+ if self.unit != None:
+ stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n")
+ if self.min != None:
+ stream.write (" ft.setInt (MIN, " + self.min + ");\n")
+ if self.max != None:
+ stream.write (" ft.setInt (MAX, " + self.max + ");\n")
+ if self.maxLen != None:
+ stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ if self.default != None:
+ stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
#=====================================================================================
#
@@ -455,6 +473,16 @@
dirTag = arg.dir.lower() + "_"
stream.write (" " + ctype + " " + dirTag + arg.getName () + ";\n")
+ def genSchema (self, stream):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
+ for arg in self.args:
+ arg.genSchema (stream)
+
#=====================================================================================
#
#=====================================================================================
@@ -550,15 +578,6 @@
for inst in self.instElements:
inst.genAccessor (stream)
- def genArgDeclaration (self, stream):
- argsFound = 0
- for method in self.methods:
- argsFound = argsFound + len (method.args)
- for event in self.events:
- argsFound = argsFound + len (event.args)
- if argsFound > 0:
- stream.write ("FieldTable arg;");
-
def genConfigCount (self, stream):
stream.write ("%d" % len (self.configElements))
@@ -683,7 +702,8 @@
number = number + 1
def genMethodSchema (self, stream):
- pass ###########################################################################
+ for method in self.methods:
+ method.genSchema (stream)
def genNameCap (self, stream):
stream.write (self.name.capitalize ())
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp Thu Dec 6 10:37:18 2007
@@ -53,12 +53,15 @@
const string MAX("max");
const string MAXLEN("maxlen");
const string DESC("desc");
+ const string ARGCOUNT("argCount");
+ const string ARGS("args");
+ const string DIR("dir");
+ const string DEFAULT("default");
}
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
{
FieldTable ft;
- /*MGEN:Class.ArgDeclaration*/
schemaNeeded = false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Dec 6 10:37:18 2007
@@ -130,6 +130,7 @@
sessionManager(conf.ack)
{
if(conf.enableMgmt){
+ ManagementAgent::enableManagement ();
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
@@ -154,7 +155,8 @@
Vhost* vhost = new Vhost (this);
vhostObject = Vhost::shared_ptr (vhost);
- queues.setParent (vhost);
+ queues.setParent (vhost);
+ exchanges.setParent (vhost);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -284,7 +286,6 @@
case management::Broker::METHOD_JOINCLUSTER :
case management::Broker::METHOD_LEAVECLUSTER :
- case management::Broker::METHOD_CRASH :
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Thu Dec 6 10:37:18 2007
@@ -30,6 +30,7 @@
bool delivered;
Deliverable() : delivered(false) {}
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Thu Dec 6 10:37:18 2007
@@ -37,3 +37,7 @@
return *msg;
}
+uint64_t DeliverableMessage::contentSize ()
+{
+ return msg->contentSize ();
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Thu Dec 6 10:37:18 2007
@@ -33,6 +33,7 @@
DeliverableMessage(intrusive_ptr<Message>& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
Message& getMessage();
+ uint64_t contentSize();
virtual ~DeliverableMessage(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Dec 6 10:37:18 2007
@@ -25,16 +25,37 @@
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
+using qpid::management::Manageable;
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == queues.end()) {
- bindings[routingKey].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingKey].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else{
return false;
@@ -43,14 +64,21 @@
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = queues.begin(); i != queues.end(); i++)
+ if ((*i)->queue == queue)
+ break;
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if (i < queues.end()) {
queues.erase(i);
- if(queues.empty()){
+ if (queues.empty()) {
bindings.erase(routingKey);
}
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -59,38 +87,65 @@
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Binding::shared_ptr>::iterator i;
int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
+
+ for(i = queues.begin(); i != queues.end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ }
+ else {
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
}
}
bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
{
+ std::vector<Binding::shared_ptr>::iterator j;
+
if (routingKey) {
Bindings::iterator i = bindings.find(*routingKey);
- return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+
+ if (i == bindings.end())
+ return false;
+ if (!queue)
+ return true;
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
- return true;
- }
- }
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++)
+ for (j = i->second.begin(); j != i->second.end(); j++)
+ if ((*j)->queue == queue)
+ return true;
return false;
}
-}
-
-DirectExchange::~DirectExchange(){
+ return false;
}
+DirectExchange::~DirectExchange() {}
const std::string DirectExchange::typeName("direct");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Dec 6 10:37:18 2007
@@ -31,17 +31,17 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- typedef std::vector<Queue::shared_ptr> Queues;
- typedef std::map<string, Queues > Bindings;
+ typedef std::vector<Binding::shared_ptr> Queues;
+ typedef std::map<string, Queues> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- DirectExchange(const std::string& name);
+ DirectExchange(const std::string& name, management::Manageable* parent = 0);
DirectExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Dec 6 10:37:18 2007
@@ -21,10 +21,52 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
+#include "qpid/management/ManagementAgent.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+Exchange::Exchange (const string& _name, Manageable* parent) :
+ name(_name), durable(false), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ Manageable* parent)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get () != 0)
+ {
+ mgmtExchange = management::Exchange::shared_ptr
+ (new management::Exchange (this, parent, _name));
+ agent->addObject (mgmtExchange);
+ }
+ }
+}
+
+Exchange::~Exchange ()
+{
+ if (mgmtExchange.get () != 0)
+ mgmtExchange->resourceDestroy ();
+}
Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer)
{
@@ -56,5 +98,43 @@
+ args.size();
}
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtExchange);
+}
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent)
+ : queue(_queue), key(_key)
+{
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ if (agent.get() != 0)
+ {
+ ManagementObject::shared_ptr mo = queue->GetManagementObject();
+ if (mo.get() != 0)
+ {
+ uint64_t queueId = mo->getObjectId();
+ mgmtBinding = management::Binding::shared_ptr
+ (new management::Binding (this, (Manageable*) parent, queueId, key));
+ agent->addObject (mgmtBinding);
+ }
+ }
+ }
+}
+
+Exchange::Binding::~Binding ()
+{
+ if (mgmtBinding.get () != 0)
+ mgmtBinding->resourceDestroy ();
+}
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtBinding);
+}
+
+Manageable::status_t Exchange::Binding::ManagementMethod (uint32_t, Args&)
+{
+ return Manageable::STATUS_UNKNOWN_METHOD;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Dec 6 10:37:18 2007
@@ -28,13 +28,16 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Exchange.h"
+#include "qpid/management/Binding.h"
namespace qpid {
namespace broker {
using std::string;
class ExchangeRegistry;
- class Exchange : public PersistableExchange{
+ class Exchange : public PersistableExchange, public management::Manageable {
private:
const string name;
const bool durable;
@@ -43,13 +46,31 @@
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ protected:
+ struct Binding : public management::Manageable {
+ typedef boost::shared_ptr<Binding> shared_ptr;
+ typedef std::vector<Binding::shared_ptr> vector;
+
+ Queue::shared_ptr queue;
+ const std::string key;
+ const qpid::framing::FieldTable args;
+ management::Binding::shared_ptr mgmtBinding;
+
+ Binding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent = 0);
+ ~Binding ();
+ management::ManagementObject::shared_ptr GetManagementObject () const;
+ management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args);
+ };
+
+ management::Exchange::shared_ptr mgmtExchange;
+
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
- explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
- Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
- virtual ~Exchange(){}
+ explicit Exchange(const string& name, management::Manageable* parent = 0);
+ Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
+ virtual ~Exchange();
const string& getName() const { return name; }
bool isDurable() { return durable; }
@@ -75,6 +96,10 @@
static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t, management::Args&) { return management::Manageable::STATUS_UNKNOWN_METHOD; }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Dec 6 10:37:18 2007
@@ -46,15 +46,15 @@
Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
}else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args));
+ exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
}else{
throw UnknownExchangeTypeException();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Dec 6 10:37:18 2007
@@ -27,6 +27,7 @@
#include "MessageStore.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/management/Manageable.h"
namespace qpid {
namespace broker {
@@ -36,7 +37,9 @@
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
qpid::sys::RWlock lock;
+ management::Manageable* parent;
public:
+ ExchangeRegistry () : parent(0) {}
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
@@ -45,6 +48,11 @@
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
Exchange::shared_ptr getDefault();
+
+ /**
+ * Register the manageable parent for declared queues
+ */
+ void setParent (management::Manageable* _parent) { parent = _parent; }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Dec 6 10:37:18 2007
@@ -25,15 +25,36 @@
using namespace qpid::framing;
using namespace qpid::sys;
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
+ std::vector<Binding::shared_ptr>::iterator i;
+
// Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(queue);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ bindings.push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -42,9 +63,17 @@
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedWlock locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -53,14 +82,40 @@
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
RWlock::ScopedRlock locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- msg.deliverTo(*i);
+ uint32_t count(0);
+
+ for(std::vector<Binding::shared_ptr>::iterator i = bindings.begin(); i != bindings.end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding.get() != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
- return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+ std::vector<Binding::shared_ptr>::iterator i;
+
+ for (i = bindings.begin (); i != bindings.end(); i++)
+ if ((*i)->queue == queue)
+ break;
+
+ return i != bindings.end();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Dec 6 10:37:18 2007
@@ -32,15 +32,16 @@
namespace broker {
class FanOutExchange : public virtual Exchange {
- std::vector<Queue::shared_ptr> bindings;
+ std::vector<Binding::shared_ptr> bindings;
qpid::sys::RWlock lock;
public:
static const std::string typeName;
- FanOutExchange(const std::string& name);
+ FanOutExchange(const std::string& name, management::Manageable* parent = 0);
FanOutExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Dec 6 10:37:18 2007
@@ -40,19 +40,40 @@
const std::string x_match("x-match");
}
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
+ Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
if (!what || (*what != all && *what != any))
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
- Binding binding(*args, queue);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), binding);
+ Bindings::iterator i;
+
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i == bindings.end()) {
- bindings.push_back(binding);
+ Binding::shared_ptr binding (new Binding ("", queue, this));
+ HeaderMap headerMap(*args, binding);
+
+ bindings.push_back(headerMap);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
} else {
return false;
@@ -61,10 +82,16 @@
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ Bindings::iterator i;
+ for (i = bindings.begin(); i != bindings.end(); i++)
+ if (i->first == *args && i->second->queue == queue)
+ break;
+
if (i != bindings.end()) {
bindings.erase(i);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
} else {
return false;
@@ -73,9 +100,29 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- RWlock::ScopedRlock locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) msg.deliverTo(i->second);
+ RWlock::ScopedRlock locker(lock);
+ uint32_t count(0);
+
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i, count++) {
+ if (match(i->first, *args)) msg.deliverTo(i->second->queue);
+ if (i->second->mgmtBinding.get() != 0)
+ i->second->mgmtBinding->inc_msgMatched ();
+ }
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
}
}
@@ -83,7 +130,7 @@
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if ( (!args || equal(i->first, *args)) && (!queue || i->second == queue)) {
+ if ( (!args || equal(i->first, *args)) && (!queue || i->second->queue == queue)) {
return true;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Dec 6 10:37:18 2007
@@ -32,8 +32,8 @@
class HeadersExchange : public virtual Exchange {
- typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
- typedef std::vector<Binding> Bindings;
+ typedef std::pair<qpid::framing::FieldTable, Binding::shared_ptr> HeaderMap;
+ typedef std::vector<HeaderMap> Bindings;
Bindings bindings;
qpid::sys::RWlock lock;
@@ -41,9 +41,10 @@
public:
static const std::string typeName;
- HeadersExchange(const string& name);
+ HeadersExchange(const string& name, management::Manageable* parent = 0);
HeadersExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec 6 10:37:18 2007
@@ -59,11 +59,14 @@
{
if (parent != 0)
{
- mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+ agent->addObject (mgmtObject);
+ }
}
}
@@ -93,14 +96,14 @@
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -118,7 +121,7 @@
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -136,7 +139,7 @@
void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -319,7 +322,7 @@
}
consumerCount++;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->inc_consumers ();
}
}
@@ -329,7 +332,7 @@
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = false;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
}
@@ -341,16 +344,6 @@
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
- mgmtObject->dec_msgDepth ();
- //mgmtObject->dec_byteDepth (msg->contentSize ());
- if (0){//msg->isPersistent ()) {
- mgmtObject->inc_msgPersistDequeues ();
- //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
- }
- }
}
return msg;
}
@@ -366,7 +359,19 @@
* Assumes messageLock is held
*/
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
+ QueuedMessage& msg = messages.front();
+
+ if (policy.get()) policy->dequeued(msg.payload->contentSize());
+ if (mgmtObject.get() != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ mgmtObject->dec_msgDepth ();
+ mgmtObject->dec_byteDepth (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
+ }
+ }
messages.pop_front();
}
@@ -473,7 +478,8 @@
}
}
-void Queue::bound(const string& exchange, const string& key, const FieldTable& args)
+void Queue::bound(const string& exchange, const string& key,
+ const FieldTable& args)
{
bindings.add(exchange, key, args);
}
@@ -584,8 +590,24 @@
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
-Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
Args& /*args*/)
{
- return Manageable::STATUS_OK;
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Queue::METHOD_PURGE :
+ purge ();
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Queue::METHOD_INCREASEJOURNALSIZE :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Dec 6 10:37:18 2007
@@ -115,9 +115,19 @@
return do_match(begin(), end(), target.begin(), target.end());
}
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-TopicExchange::TopicExchange(const std::string& _name, bool _durable, const FieldTable& _args) : Exchange(_name, _durable, _args) {}
+TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
+TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+ const FieldTable& _args, Manageable* _parent) :
+ Exchange(_name, _durable, _args, _parent)
+{
+ if (mgmtExchange.get() != 0)
+ mgmtExchange->set_type (typeName);
+}
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
@@ -125,7 +135,11 @@
if (isBound(queue, routingPattern)) {
return false;
} else {
- bindings[routingPattern].push_back(queue);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this));
+ bindings[routingPattern].push_back(binding);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->inc_bindings ();
+ }
return true;
}
}
@@ -133,12 +147,19 @@
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
+ Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
if(q == qv.end()) return false;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
+ if (mgmtExchange.get() != 0) {
+ mgmtExchange->dec_bindings ();
+ }
return true;
}
@@ -146,21 +167,45 @@
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Queue::vector& qv(bi->second);
- return find(qv.begin(), qv.end(), queue) != qv.end();
+ Binding::vector& qv(bi->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ break;
+ return q != qv.end();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ uint32_t count(0);
Tokens tokens(routingKey);
+
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- msg.deliverTo(*j);
+ Binding::vector& qv(i->second);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
+ msg.deliverTo((*j)->queue);
+ if ((*j)->mgmtBinding.get() != 0)
+ (*j)->mgmtBinding->inc_msgMatched ();
}
}
}
+
+ if (mgmtExchange.get() != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
@@ -176,16 +221,16 @@
return true;
}
}
- return false;
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Queue::vector& qv(i->second);
- if (find(qv.begin(), qv.end(), queue) != qv.end()) {
- return true;
- }
+ Binding::vector& qv(i->second);
+ Binding::vector::iterator q;
+ for (q = qv.begin(); q != qv.end(); q++)
+ if ((*q)->queue == queue)
+ return true;
}
- return false;
}
+ return false;
}
TopicExchange::~TopicExchange() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Dec 6 10:37:18 2007
@@ -71,7 +71,7 @@
};
class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ typedef std::map<TopicPattern, Binding::vector> BindingMap;
BindingMap bindings;
qpid::sys::RWlock lock;
@@ -79,9 +79,9 @@
public:
static const std::string typeName;
- TopicExchange(const string& name);
+ TopicExchange(const string& name, management::Manageable* parent = 0);
TopicExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
virtual std::string getType() const { return typeName; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Thu Dec 6 10:37:18 2007
@@ -67,3 +67,7 @@
queue->process(msg);
}
+uint64_t TxPublish::contentSize ()
+{
+ return msg->contentSize ();
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Thu Dec 6 10:37:18 2007
@@ -66,10 +66,12 @@
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
-
+
virtual void deliverTo(Queue::shared_ptr& queue);
virtual ~TxPublish(){}
+
+ uint64_t contentSize();
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Thu Dec 6 10:37:18 2007
@@ -27,11 +27,14 @@
{
if (parentBroker != 0)
{
- mgmtObject = management::Vhost::shared_ptr
- (new management::Vhost (this, parentBroker, "/"));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Vhost::shared_ptr
+ (new management::Vhost (this, parentBroker, "/"));
+ agent->addObject (mgmtObject);
+ }
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Dec 6 10:37:18 2007
@@ -33,6 +33,7 @@
using namespace qpid::sys;
ManagementAgent::shared_ptr ManagementAgent::agent;
+bool ManagementAgent::enabled = 0;
ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
{
@@ -40,16 +41,21 @@
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+void ManagementAgent::enableManagement (void)
+{
+ enabled = 1;
+}
+
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
{
- if (agent.get () == 0)
+ if (enabled && agent.get () == 0)
agent = shared_ptr (new ManagementAgent (10));
return agent;
}
-void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
- Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
+ broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
@@ -57,6 +63,7 @@
void ManagementAgent::addObject (ManagementObject::shared_ptr object)
{
+ RWlock::ScopedWlock writeLock (userLock);
uint64_t objectId = nextObjectId++;
object->setObjectId (objectId);
@@ -74,6 +81,8 @@
void ManagementAgent::clientAdded (void)
{
+ RWlock::ScopedRlock readLock (userLock);
+
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -94,7 +103,7 @@
void ManagementAgent::SendBuffer (Buffer& buf,
uint32_t length,
- Exchange::shared_ptr exchange,
+ broker::Exchange::shared_ptr exchange,
string routingKey)
{
intrusive_ptr<Message> msg (new Message ());
@@ -129,9 +138,10 @@
{
#define BUFSIZE 65536
#define THRESHOLD 16384
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
+ RWlock::ScopedWlock writeLock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
std::list<uint64_t> deleteList;
if (managementObjects.empty ())
@@ -157,7 +167,7 @@
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
- if (object->getConfigChanged ())
+ if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
EncodeHeader (msgBuffer);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Dec 6 10:37:18 2007
@@ -25,6 +25,7 @@
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
+#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
#include <boost/shared_ptr.hpp>
@@ -41,11 +42,12 @@
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+ static void enableManagement (void);
static shared_ptr getAgent (void);
void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (broker::Exchange::shared_ptr mgmtExchange,
- broker::Exchange::shared_ptr directExchange);
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
void addObject (ManagementObject::shared_ptr object);
void clientAdded (void);
void dispatchCommand (broker::Deliverable& msg,
@@ -64,6 +66,9 @@
};
static shared_ptr agent;
+ static bool enabled;
+
+ qpid::sys::RWlock userLock;
ManagementObjectMap managementObjects;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp Thu Dec 6 10:37:18 2007
@@ -27,13 +27,14 @@
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name) :
- Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
+ Exchange (_name, _parent), TopicExchange(_name, _parent) {}
ManagementExchange::ManagementExchange (const std::string& _name,
bool _durable,
- const FieldTable& _args) :
- Exchange (_name, _durable, _args),
- TopicExchange(_name, _durable, _args) {}
+ const FieldTable& _args,
+ Manageable* _parent) :
+ Exchange (_name, _durable, _args, _parent),
+ TopicExchange(_name, _durable, _args, _parent) {}
bool ManagementExchange::bind (Queue::shared_ptr queue,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h?rev=601807&r1=601806&r2=601807&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h Thu Dec 6 10:37:18 2007
@@ -35,9 +35,10 @@
public:
static const std::string typeName;
- ManagementExchange (const string& name);
+ ManagementExchange (const string& name, Manageable* _parent = 0);
ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args);
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0);
virtual std::string getType() const { return typeName; }