You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ns...@apache.org on 2008/03/25 21:30:04 UTC
svn commit: r640970 - in /incubator/qpid/trunk/qpid:
cpp/managementgen/templates/ cpp/src/qpid/broker/ cpp/src/qpid/management/
python/mgmt-cli/ python/qpid/ specs/
Author: nsantos
Date: Tue Mar 25 13:30:01 2008
New Revision: 640970
URL: http://svn.apache.org/viewvc?rev=640970&view=rev
Log:
QPID-877: applied patch from Ted Ross
Modified:
incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
incubator/qpid/trunk/qpid/python/qpid/management.py
incubator/qpid/trunk/qpid/specs/management-schema.xml
incubator/qpid/trunk/qpid/specs/management-types.xml
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp Tue Mar 25 13:30:01 2008
@@ -106,11 +106,12 @@
/*MGEN:Class.WriteConfig*/
}
-void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf)
+void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders)
{
instChanged = false;
- writeTimestamps (buf);
+ if (!skipHeaders)
+ writeTimestamps (buf);
/*MGEN:Class.WriteInst*/
// Maintenance of hi-lo statistics
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Tue Mar 25 13:30:01 2008
@@ -25,8 +25,9 @@
#include "qpid/sys/Mutex.h"
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/Uuid.h"
-namespace qpid {
+namespace qpid {
namespace management {
class /*MGEN:Class.NameCap*/ : public ManagementObject
@@ -45,7 +46,8 @@
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& buf,
+ bool skipHeaders = false);
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Mar 25 13:30:01 2008
@@ -127,7 +127,8 @@
dtxManager.setStore (store);
if(conf.enableMgmt){
- ManagementAgent::enableManagement ();
+ ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval);
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Mar 25 13:30:01 2008
@@ -25,6 +25,8 @@
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
#include <list>
+#include <iostream>
+#include <fstream>
using boost::intrusive_ptr;
using namespace qpid::framing;
@@ -36,25 +38,62 @@
ManagementAgent::shared_ptr ManagementAgent::agent;
bool ManagementAgent::enabled = 0;
-ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
+ dataDir (_dataDir), interval (_interval)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
nextRemotePrefix = 101;
+
+ // Get from file or generate and save to file.
+ if (dataDir.empty ())
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
+ << uuid);
+ }
+ else
+ {
+ string filename (dataDir + "/brokerId");
+ ifstream inFile (filename.c_str ());
+
+ if (inFile.good ())
+ {
+ inFile >> uuid;
+ inFile.close ();
+ QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+ }
+ else
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
+
+ ofstream outFile (filename.c_str ());
+ if (outFile.good ())
+ {
+ outFile << uuid << endl;
+ outFile.close ();
+ QPID_LOG (debug, "ManagementAgent saved broker ID");
+ }
+ else
+ {
+ QPID_LOG (warning, "ManagementAgent unable to save broker ID");
+ }
+ }
+ }
}
ManagementAgent::~ManagementAgent () {}
-void ManagementAgent::enableManagement (void)
+void ManagementAgent::enableManagement (string dataDir, uint16_t interval)
{
enabled = 1;
+ if (agent.get () == 0)
+ agent = shared_ptr (new ManagementAgent (dataDir, interval));
}
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
{
- if (enabled && agent.get () == 0)
- agent = shared_ptr (new ManagementAgent (10));
-
return agent;
}
@@ -122,27 +161,25 @@
}
}
-void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
- buf.putOctet ('0');
buf.putOctet ('1');
buf.putOctet (opcode);
- buf.putOctet (cls);
+ buf.putLong (seq);
}
-bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
uint8_t h1 = buf.getOctet ();
uint8_t h2 = buf.getOctet ();
uint8_t h3 = buf.getOctet ();
- uint8_t h4 = buf.getOctet ();
*opcode = buf.getOctet ();
- *cls = buf.getOctet ();
+ *seq = buf.getLong ();
- return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
}
void ManagementAgent::SendBuffer (Buffer& buf,
@@ -199,24 +236,24 @@
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'C', 'C');
+ EncodeHeader (msgBuffer, 'c');
object->writeConfig (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "mgmt.config." + object->getClassName ();
+ routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'C', 'I');
+ EncodeHeader (msgBuffer, 'i');
object->writeInstrumentation (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "mgmt.inst." + object->getClassName ();
+ routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -233,6 +270,20 @@
deleteList.clear ();
}
+void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'z', sequence);
+ outBuffer.putLong (code);
+ outBuffer.putShortString (text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
void ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
@@ -295,13 +346,13 @@
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- uint8_t opcode, unused;
+ uint32_t outLen, sequence;
+ uint8_t opcode;
msg.encodeContent (inBuffer);
inBuffer.reset ();
- if (!CheckHeader (inBuffer, &opcode, &unused))
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
{
QPID_LOG (debug, " Invalid content header");
return;
@@ -313,8 +364,7 @@
return;
}
- uint32_t methodId = inBuffer.getLong ();
- uint64_t objId = inBuffer.getLongLong ();
+ uint64_t objId = inBuffer.getLongLong ();
string replyToKey;
const framing::MessageProperties* p =
@@ -330,8 +380,7 @@
return;
}
- EncodeHeader (outBuffer, 'm');
- outBuffer.putLong (methodId);
+ EncodeHeader (outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
if (iter == managementObjects.end ())
@@ -349,22 +398,20 @@
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementAgent::handleHello (Buffer&, string replyToKey)
+void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- uint8_t* dat = (uint8_t*) "Broker ID";
- EncodeHeader (outBuffer, 'I');
- outBuffer.putShort (9);
- outBuffer.putRawData (dat, 9);
+ EncodeHeader (outBuffer, 'b', sequence);
+ uuid.encode (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey)
+void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence)
{
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -373,15 +420,17 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
+ EncodeHeader (outBuffer, 'p', sequence);
EncodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
+
+ sendCommandComplete (replyToKey, sequence);
}
-void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/)
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
std::string packageName;
@@ -389,7 +438,7 @@
FindOrAddPackage (packageName);
}
-void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
std::string packageName;
@@ -405,16 +454,18 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q');
+ EncodeHeader (outBuffer, 'q', sequence);
EncodeClassIndication (outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
}
+
+ sendCommandComplete (replyToKey, sequence);
}
-void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -436,7 +487,7 @@
if (classInfo.writeSchemaCall != 0)
{
- EncodeHeader (outBuffer, 's');
+ EncodeHeader (outBuffer, 's', sequence);
classInfo.writeSchemaCall (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
@@ -459,7 +510,7 @@
return nextRemotePrefix++;
}
-void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string label;
uint32_t requestedPrefix;
@@ -472,17 +523,55 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'a');
+ EncodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (assignedPrefix);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
+void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ ft.decode (inBuffer);
+ value = ft.get ("_class");
+ if (value->empty () || !value->convertsTo<string> ())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className (value->get<string> ());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'g', sequence);
+ object->writeConfig (outBuffer);
+ object->writeInstrumentation (outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
void ManagementAgent::dispatchAgentCommand (Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode, unused;
+ uint8_t opcode;
+ uint32_t sequence;
string replyToKey;
const framing::MessageProperties* p =
@@ -498,15 +587,16 @@
msg.encodeContent (inBuffer);
inBuffer.reset ();
- if (!CheckHeader (inBuffer, &opcode, &unused))
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'H') handleHello (inBuffer, replyToKey);
- else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey);
- else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey);
- else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey);
- else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey);
- else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey);
+ if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence);
}
ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
@@ -528,7 +618,7 @@
EncodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package");
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
return result.first;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Mar 25 13:30:01 2008
@@ -25,19 +25,20 @@
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
#include <qpid/framing/AMQFrame.h>
#include <boost/shared_ptr.hpp>
-namespace qpid {
+namespace qpid {
namespace management {
class ManagementAgent
{
private:
- ManagementAgent (uint16_t interval);
+ ManagementAgent (std::string dataDir, uint16_t interval);
public:
@@ -45,7 +46,7 @@
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
- static void enableManagement (void);
+ static void enableManagement (std::string dataDir, uint16_t interval);
static shared_ptr getAgent (void);
static void shutdown (void);
@@ -130,10 +131,12 @@
static shared_ptr agent;
static bool enabled;
+ qpid::framing::Uuid uuid;
qpid::sys::RWlock userLock;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
broker::Exchange::shared_ptr dExchange;
+ std::string dataDir;
uint16_t interval;
uint64_t nextObjectId;
uint32_t nextRemotePrefix;
@@ -143,8 +146,8 @@
char outputBuffer[MA_BUFFER_SIZE];
void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void SendBuffer (qpid::framing::Buffer& buf,
uint32_t length,
broker::Exchange::shared_ptr exchange,
@@ -164,16 +167,17 @@
PackageMap::iterator pIter,
ClassMap::iterator cIter);
uint32_t assignPrefix (uint32_t requestedPrefix);
- void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
+ void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
};
}}
-
-
#endif /*!_ManagementAgent_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Tue Mar 25 13:30:01 2008
@@ -57,6 +57,7 @@
static const uint8_t TYPE_BOOL = 11;
static const uint8_t TYPE_FLOAT = 12;
static const uint8_t TYPE_DOUBLE = 13;
+ static const uint8_t TYPE_UUID = 14;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
@@ -85,7 +86,8 @@
virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
virtual bool firstInstance (void) = 0;
virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
- virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+ virtual void writeInstrumentation (qpid::framing::Buffer& buf,
+ bool skipHeaders = false) = 0;
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
Modified: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py Tue Mar 25 13:30:01 2008
@@ -111,6 +111,10 @@
finally:
self.lock.release ()
+ def ctrlHandler (self, context, op, data):
+ if op == self.mclient.CTRL_BROKER_INFO:
+ pass
+
def configHandler (self, context, className, list, timestamps):
self.dataHandler (0, className, list, timestamps);
@@ -149,7 +153,7 @@
self.client.start ({"LOGIN": username, "PASSWORD": password})
self.channel = self.client.channel (1)
- self.mclient = managementClient (self.spec, None, self.configHandler,
+ self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
self.instHandler, self.methodReply)
self.mclient.schemaListener (self.schemaHandler)
self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb)
@@ -194,6 +198,8 @@
return "False"
else:
return "True"
+ elif typecode == 14:
+ return str (UUID (bytes=value))
return "*type-error*"
def getObjIndex (self, className, config):
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Tue Mar 25 13:30:01 2008
@@ -69,12 +69,14 @@
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
response = ch.session_open (detached_lifetime=300)
+ self.sessionId = response.session_id
self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id)
self.qpidChannel = ch
self.tcb = topicCb
self.rcb = replyCb
self.context = cbContext
+ self.reqsOutstanding = 0
ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
@@ -114,6 +116,10 @@
network. It implements the management protocol and manages the management
schemas as advertised by the various management agents in the network. """
+ CTRL_BROKER_INFO = 1
+ CTRL_SCHEMA_LOADED = 2
+ CTRL_USER = 3
+
#========================================================
# User API - interacts with the class's user
#========================================================
@@ -144,7 +150,7 @@
""" Register a new channel. """
self.channels.append (channel)
codec = Codec (StringIO (), self.spec)
- self.setHeader (codec, ord ('H'))
+ self.setHeader (codec, ord ('B'))
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
msg["routing_key"] = "agent"
@@ -161,6 +167,22 @@
""" Invoke a method on a managed object. """
self.method (channel, userSequence, objId, className, methodName, args)
+ def getObjects (self, channel, userSequence, className):
+ """ Request immediate content from broker """
+ codec = Codec (StringIO (), self.spec)
+ self.setHeader (codec, ord ('G'), userSequence)
+ ft = {}
+ ft["_class"] = className
+ codec.encode_table (ft)
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "agent"
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
+
+
#========================================================
# Channel API - interacts with registered channel objects
#========================================================
@@ -182,9 +204,11 @@
return
if hdr[0] == 'm':
- self.handleMethodReply (ch, codec)
- elif hdr[0] == 'I':
- self.handleInit (ch, codec)
+ self.handleMethodReply (ch, codec, hdr[1])
+ elif hdr[0] == 'z':
+ self.handleCommandComplete (ch, codec, hdr[1])
+ elif hdr[0] == 'b':
+ self.handleBrokerResponse (ch, codec)
elif hdr[0] == 'p':
self.handlePackageInd (ch, codec)
elif hdr[0] == 'q':
@@ -196,14 +220,13 @@
#========================================================
# Internal Functions
#========================================================
- def setHeader (self, codec, opcode, cls = 0):
+ def setHeader (self, codec, opcode, seq = 0):
""" Compose the header of a management message. """
codec.encode_octet (ord ('A'))
codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('0'))
codec.encode_octet (ord ('1'))
codec.encode_octet (opcode)
- codec.encode_octet (cls)
+ codec.encode_long (seq)
def checkHeader (self, codec):
""" Check the header of a management message and extract the opcode and
@@ -215,14 +238,11 @@
if octet != 'M':
return None
octet = chr (codec.decode_octet ())
- if octet != '0':
- return None
- octet = chr (codec.decode_octet ())
if octet != '1':
return None
opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
- return (opcode, cls)
+ seq = codec.decode_long ()
+ return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
@@ -252,6 +272,8 @@
codec.encode_float (float (value))
elif typecode == 13: # DOUBLE
codec.encode_double (double (value))
+ elif typecode == 14: # UUID
+ codec.encode_uuid (value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -283,14 +305,24 @@
data = codec.decode_float ()
elif typecode == 13: # DOUBLE
data = codec.decode_double ()
+ elif typecode == 14: # UUID
+ data = codec.decode_uuid ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
- def handleMethodReply (self, ch, codec):
- sequence = codec.decode_long ()
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
+ def incOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding + 1
+
+ def decOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding - 1
+ if ch.reqsOutstanding == 0:
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
+
+ def handleMethodReply (self, ch, codec, sequence):
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
data = self.seqMgr.release (sequence)
if data == None:
@@ -317,15 +349,27 @@
if self.methodCb != None:
self.methodCb (ch.context, userSequence, status, sText, args)
- def handleInit (self, ch, codec):
- len = codec.decode_short ()
- data = codec.decode_raw (len)
+ def handleCommandComplete (self, ch, codec, seq):
+ code = codec.decode_long ()
+ text = codec.decode_shortstr ()
+ data = (seq, code, text)
+ context = self.seqMgr.release (seq)
+ if context == "outstanding":
+ self.decOutstanding (ch)
+ elif self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_USER, data)
+
+ def handleBrokerResponse (self, ch, codec):
if self.ctrlCb != None:
- self.ctrlCb (ch.context, len, data)
+ uuid = codec.decode_uuid ()
+ data = (uuid, ch.sessionId)
+ self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
# Send a package request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('P'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('P'), seq)
+ self.incOutstanding (ch)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
smsg["routing_key"] = "agent"
@@ -341,7 +385,9 @@
# Send a class request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('Q'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('Q'), seq)
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
@@ -362,6 +408,7 @@
# Send a schema request
sendCodec = Codec (StringIO (), self.spec)
self.setHeader (sendCodec, ord ('S'))
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
sendCodec.encode_shortstr (cname)
sendCodec.encode_bin128 (hash)
@@ -373,8 +420,9 @@
smsg["reply_to"]["routing_key"] = ch.replyName
ch.send ("qpid.management", smsg)
- def parseSchema (self, ch, cls, codec):
+ def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
+ self.decOutstanding (ch)
packageName = codec.decode_shortstr ()
className = codec.decode_shortstr ()
hash = codec.decode_bin128 ()
@@ -495,7 +543,7 @@
def parseContent (self, ch, cls, codec):
""" Parse a received content message. """
- if cls == 'C' and self.configCb == None:
+ if (cls == 'C' or cls == 'B') and self.configCb == None:
return
if cls == 'I' and self.instCb == None:
return
@@ -516,23 +564,39 @@
timestamps.append (codec.decode_longlong ()) # Delete Time
schemaClass = self.schema[classKey]
- for element in schemaClass[cls][:]:
- tc = element[1]
- name = element[0]
- data = self.decodeValue (codec, tc)
- row.append ((name, data))
+ if cls == 'C' or cls == 'B':
+ for element in schemaClass['C'][:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'I' or cls == 'B':
+ if cls == 'B':
+ start = 1
+ else:
+ start = 0
+ for element in schemaClass['I'][start:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
- if cls == 'C':
+ if cls == 'C' or cls == 'B':
self.configCb (ch.context, classKey, row, timestamps)
elif cls == 'I':
self.instCb (ch.context, classKey, row, timestamps)
- def parse (self, ch, codec, opcode, cls):
+ def parse (self, ch, codec, opcode, seq):
""" Parse a message received from the topic queue. """
if opcode == 's':
- self.parseSchema (ch, cls, codec)
- elif opcode == 'C':
- self.parseContent (ch, cls, codec)
+ self.parseSchema (ch, codec)
+ elif opcode == 'c':
+ self.parseContent (ch, 'C', codec)
+ elif opcode == 'i':
+ self.parseContent (ch, 'I', codec)
+ elif opcode == 'g':
+ self.parseContent (ch, 'B', codec)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
@@ -540,8 +604,7 @@
""" Invoke a method on an object """
codec = Codec (StringIO (), self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
- self.setHeader (codec, ord ('M'))
- codec.encode_long (sequence) # Method sequence id
+ self.setHeader (codec, ord ('M'), sequence)
codec.encode_longlong (objId) # ID of object
# Encode args according to schema
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Mar 25 13:30:01 2008
@@ -93,6 +93,15 @@
<!--
===============================================================
+ Management Agent
+ ===============================================================
+ -->
+ <class name="agent">
+ <configElement name="id" type="uuid" access="RO" index="y" desc="Agent ID"/>
+ </class>
+
+ <!--
+ ===============================================================
Virtual Host
===============================================================
-->
Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Tue Mar 25 13:30:01 2008
@@ -19,18 +19,19 @@
under the License.
-->
-<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
-<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
-<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
-<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
-<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
-<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
-<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
-<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
+<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
+<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
+<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
+<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
+<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
+<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
+<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
+<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
+<type name="uuid" base="UUID" cpp="framing::Uuid" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>