You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/08 04:46:45 UTC
svn commit: r920189 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/management/
Author: tross
Date: Mon Mar 8 03:46:44 2010
New Revision: 920189
URL: http://svn.apache.org/viewvc?rev=920189&view=rev
Log:
Added hooks in the broker for QMFv2 management of the broker.
Now both DirectExchange and TopicExchange have been subclassed so messages can be
redirected to the embedded management agent (in QMFv1, only the topic exchange was
subclassed this way).
Added:
qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
- copied, changed from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
- copied, changed from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Removed:
qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Mar 8 03:46:44 2010
@@ -636,8 +636,10 @@
qpid/management/IdAllocator.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
- qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementExchange.h \
+ qpid/management/ManagementDirectExchange.cpp \
+ qpid/management/ManagementDirectExchange.h \
+ qpid/management/ManagementTopicExchange.cpp \
+ qpid/management/ManagementTopicExchange.h \
qpid/sys/TCPIOPlugin.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Mar 8 03:46:44 2010
@@ -35,7 +35,8 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -234,11 +235,22 @@
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementExchange::typeName);
- Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
- Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
- boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get());
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
+
+ std::string qmfTopic("qmf.default.topic");
+ std::string qmfDirect("qmf.default.direct");
+
+ std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
+ std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+
+ boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
+
+ managementAgent->setExchangeV2(topicPair.first, directPair.first);
}
else
QPID_LOG(info, "Management not enabled");
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Mon Mar 8 03:46:44 2010
@@ -24,7 +24,8 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
@@ -52,8 +53,10 @@
exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
}else if (type == HeadersExchange::typeName) {
exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Mar 8 03:46:44 2010
@@ -71,11 +71,13 @@
Mutex::ScopedLock lock (userLock);
// Reset the shared pointers to exchanges. If this is not done now, the exchanges
- // will stick around until dExchange and mExchange are implicitely destroyed (long
+ // will stick around until dExchange and mExchange are implicitly destroyed (long
// after this destructor completes). Those exchanges hold references to management
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ v2Topic.reset();
+ v2Direct.reset();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -158,13 +160,20 @@
}
}
-void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
+{
+ v2Topic = _texchange;
+ v2Direct = _dexchange;
+}
+
void ManagementAgent::registerClass (const string& packageName,
const string& className,
uint8_t* md5Sum,
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Mar 8 03:46:44 2010
@@ -74,9 +74,12 @@
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
- void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
- qpid::broker::Exchange::shared_ptr directExchange);
+ void setInterval(uint16_t _interval) { interval = _interval; }
+ void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+ void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+
int getMaxThreads () { return threadPoolSize; }
QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
const std::string& className,
@@ -240,6 +243,8 @@
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
+ qpid::broker::Exchange::shared_ptr v2Topic;
+ qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
Added: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=920189&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Mon Mar 8 03:46:44 2010
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/log/Statement.h"
+#include <assert.h>
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, _args, _parent, b),
+ DirectExchange(_name, _durable, _args, _parent, b) {}
+
+void ManagementDirectExchange::route(Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ bool routeIt = true;
+
+ // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+
+ if (routeIt)
+ DirectExchange::route(msg, routingKey, args);
+}
+
+void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+ managementAgent = agent;
+ qmfVersion = qv;
+ assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange
+}
+
+
+ManagementDirectExchange::~ManagementDirectExchange() {}
+
+const std::string ManagementDirectExchange::typeName("management-direct");
+
Added: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h?rev=920189&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h Mon Mar 8 03:46:44 2010
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementDirectExchange_
+#define _ManagementDirectExchange_
+
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementDirectExchange : public virtual DirectExchange
+{
+ private:
+ management::ManagementAgent* managementAgent;
+ int qmfVersion;
+
+ public:
+ static const std::string typeName;
+
+ ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementDirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual void route(Deliverable& msg,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
+
+ virtual ~ManagementDirectExchange();
+};
+
+
+}
+}
+
+#endif
Copied: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp&r1=920168&r2=920189&rev=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Mon Mar 8 03:46:44 2010
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Statement.h"
using namespace qpid::management;
@@ -27,46 +27,50 @@
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
-ManagementExchange::ManagementExchange (const std::string& _name,
- bool _durable,
- const FieldTable& _args,
- Manageable* _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
Exchange (_name, _durable, _args, _parent, b),
TopicExchange(_name, _durable, _args, _parent, b) {}
-void ManagementExchange::route (Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementTopicExchange::route(Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
{
bool routeIt = true;
// Intercept management agent commands
- if ((routingKey.length() > 6 &&
- routingKey.substr(0, 6).compare("agent.") == 0) ||
- (routingKey == "broker"))
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
+ if (qmfVersion == 1) {
+ if ((routingKey.length() > 6 &&
+ routingKey.substr(0, 6).compare("agent.") == 0) ||
+ (routingKey == "broker"))
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
+ }
if (routeIt)
TopicExchange::route(msg, routingKey, args);
}
-bool ManagementExchange::bind (Queue::shared_ptr queue,
- const string& routingKey,
- const qpid::framing::FieldTable* args)
+bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args)
{
- managementAgent->clientAdded(routingKey);
+ if (qmfVersion == 1)
+ managementAgent->clientAdded(routingKey);
return TopicExchange::bind(queue, routingKey, args);
}
-void ManagementExchange::setManagmentAgent (ManagementAgent* agent)
+void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
{
managementAgent = agent;
+ qmfVersion = qv;
}
-ManagementExchange::~ManagementExchange() {}
+ManagementTopicExchange::~ManagementTopicExchange() {}
-const std::string ManagementExchange::typeName("management");
+const std::string ManagementTopicExchange::typeName("management-topic");
Copied: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h (from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h?p2=qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h&p1=qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h&r1=920168&r2=920189&rev=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h Mon Mar 8 03:46:44 2010
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _ManagementExchange_
-#define _ManagementExchange_
+#ifndef _ManagementTopicExchange_
+#define _ManagementTopicExchange_
#include "qpid/broker/TopicExchange.h"
#include "qpid/management/ManagementAgent.h"
@@ -27,32 +27,33 @@
namespace qpid {
namespace broker {
-class ManagementExchange : public virtual TopicExchange
+class ManagementTopicExchange : public virtual TopicExchange
{
private:
management::ManagementAgent* managementAgent;
+ int qmfVersion;
public:
static const std::string typeName;
- ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
- virtual void route (Deliverable& msg,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
- virtual bool bind (Queue::shared_ptr queue,
+ virtual void route(Deliverable& msg,
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementAgent* agent);
+ virtual bool bind(Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
- virtual ~ManagementExchange();
+ virtual ~ManagementTopicExchange();
};
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org