You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/15 21:49:26 UTC
svn commit: r595453 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/management:
Broker.cpp Broker.h ManagementAgent.cpp ManagementAgent.h
ManagementObject.cpp ManagementObject.h Queue.cpp Queue.h Vhost.cpp Vhost.h
Author: aconway
Date: Thu Nov 15 12:49:25 2007
New Revision: 595453
URL: http://svn.apache.org/viewvc?rev=595453&view=rev
Log:
QPID-687: comitted qpid-patch7-cpp.diff qpid-patch7-python.diff
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.cpp Thu Nov 15 12:49:25 2007
@@ -21,6 +21,7 @@
#include "config.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldTable.h"
#include "Broker.h"
#include "ArgsBrokerEcho.h"
@@ -31,7 +32,7 @@
bool Broker::schemaNeeded = true;
Broker::Broker (Manageable* _core, const Options& _conf) :
- ManagementObject (_core)
+ ManagementObject (_core, "broker")
{
broker::Broker::Options& conf = (broker::Broker::Options&) _conf;
@@ -54,28 +55,149 @@
void Broker::writeSchema (Buffer& buf)
{
+ FieldTable ft;
+ FieldTable arg;
+
schemaNeeded = false;
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true);
- schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true);
- schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
- schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true);
- schemaItem (buf, TYPE_UINT16, "connBacklog",
- "Connection backlog limit for listening socket", true);
- schemaItem (buf, TYPE_UINT32, "stagingThreshold",
- "Broker stages messages over this size to disk", true);
- schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true);
- schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true);
- schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
- schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
- "Number of disk pages allocated for storage", true);
- schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
- "Number of disk pages allocated per queue", true);
- schemaItem (buf, TYPE_STRING, "clusterName",
- "Name of cluster this server is a member of, zero-length for standalone server", true);
- schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
- schemaListEnd (buf);
+ // Schema class header:
+ buf.putShortString (className); // Class Name
+ buf.putShort (13); // Config Element Count
+ buf.putShort (0); // Inst Element Count
+ buf.putShort (1); // Method Count
+ buf.putShort (0); // Event Count
+
+ // Config Elements
+ ft = FieldTable ();
+ ft.setString ("name", "systemRef");
+ ft.setInt ("type", TYPE_U64);
+ ft.setInt ("access", ACCESS_RC);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "System ID");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "port");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RC);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "TCP Port for AMQP Service");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "workerThreads");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Thread pool size");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "maxConns");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Maximum allowed connections");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "connBacklog");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Connection backlog limit for listening socket");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "stagingThreshold");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Broker stages messages over this size to disk");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "storeLib");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Name of persistent storage library");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "asyncStore");
+ ft.setInt ("type", TYPE_U8);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Use async persistent store");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "mgmtPubInterval");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RW);
+ ft.setInt ("index", 0);
+ ft.setInt ("min", 1);
+ ft.setString ("unit", "second");
+ ft.setString ("desc", "Interval for management broadcasts");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "initialDiskPageSize");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Number of disk pages allocated for storage");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "initialPagesPerQueue");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Number of disk pages allocated per queue");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "clusterName");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Name of cluster this server is a member of, zero-length for standalone server");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "version");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Running software version");
+ buf.put (ft);
+
+ // Inst Elements
+
+ return; // TODO - Remove
+
+ // Methods
+ ft = FieldTable ();
+ ft.setString ("name", "echo");
+ ft.setInt ("args", 2);
+
+ arg = FieldTable ();
+ arg.setString ("name", "sequence");
+ arg.setInt ("type", TYPE_U32);
+ arg.setInt ("dir", DIR_IO);
+ ft.setTable ("arg", arg);
+
+ arg = FieldTable ();
+ arg.setString ("name", "body");
+ arg.setInt ("type", TYPE_LSTR);
+ arg.setInt ("dir", DIR_IO);
+ ft.setTable ("arg", arg);
+
+ buf.put (ft);
+
+ // Events
}
void Broker::writeConfig (Buffer& buf)
@@ -83,7 +205,7 @@
configChanged = false;
writeTimestamps (buf);
- buf.putLong (0);
+ buf.putLongLong (0);
buf.putShort (port);
buf.putShort (workerThreads);
buf.putShort (maxConns);
@@ -99,8 +221,8 @@
}
void Broker::doMethod (string methodName,
- Buffer& inBuf,
- Buffer& outBuf)
+ Buffer& inBuf,
+ Buffer& outBuf)
{
if (methodName.compare ("echo") == 0)
{
@@ -116,6 +238,12 @@
outBuf.putShortString ("OK");
outBuf.putLong (args.io_sequence);
outBuf.putLongString (args.io_body);
+ }
+
+ // TODO - Remove this method prior to beta
+ else if (methodName.compare ("crash") == 0)
+ {
+ assert (0);
}
else
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Broker.h Thu Nov 15 12:49:25 2007
@@ -56,16 +56,14 @@
std::string clusterName;
std::string version;
- uint16_t getObjectType (void) { return OBJECT_BROKER; }
- std::string getObjectName (void) { return "broker"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
- void doMethod (std::string methodName,
- qpid::framing::Buffer& inBuf,
- qpid::framing::Buffer& outBuf);
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string methodName,
+ qpid::framing::Buffer& inBuf,
+ qpid::framing::Buffer& outBuf);
inline bool getInstChanged (void) { return false; }
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Nov 15 12:49:25 2007
@@ -84,26 +84,60 @@
}
}
+void ManagementAgent::EncodeHeader (Buffer& buf)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('0');
+ buf.putOctet ('1');
+}
+
+void ManagementAgent::SendBuffer (Buffer& buf,
+ uint32_t length,
+ Exchange::shared_ptr exchange,
+ string routingKey)
+{
+ intrusive_ptr<Message> msg (new Message ());
+ AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
+ 0, exchange->getName (), 0, 0));
+ AMQFrame header (0, AMQHeaderBody());
+ AMQFrame content;
+
+ QPID_LOG (debug, "ManagementAgent::SendBuffer - key="
+ << routingKey << " len=" << length);
+
+ content.setBody(AMQContentBody());
+ content.castBody<AMQContentBody>()->decode(buf, length);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(length);
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ exchange->route (deliverable, routingKey, 0);
+}
+
void ManagementAgent::PeriodicProcessing (void)
{
#define BUFSIZE 65536
#define THRESHOLD 16384
char msgChars[BUFSIZE];
- Buffer msgBuffer (msgChars, BUFSIZE);
uint32_t contentSize;
+ string routingKey;
std::list<uint64_t> deleteList;
if (managementObjects.empty ())
return;
- intrusive_ptr<Message> msg (new Message ());
-
- // Build the magic number for the management message.
- msgBuffer.putOctet ('A');
- msgBuffer.putOctet ('M');
- msgBuffer.putOctet ('0');
- msgBuffer.putOctet ('1');
-
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
@@ -112,105 +146,56 @@
if (object->getSchemaNeeded ())
{
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer);
msgBuffer.putOctet ('S'); // opcode = Schema Record
msgBuffer.putOctet (0); // content-class = N/A
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
object->writeSchema (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt.schema." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getConfigChanged ())
{
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer);
msgBuffer.putOctet ('C'); // opcode = Content Record
msgBuffer.putOctet ('C'); // content-class = Configuration
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
object->writeConfig (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt.config." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer);
msgBuffer.putOctet ('C'); // opcode = Content Record
msgBuffer.putOctet ('I'); // content-class = Instrumentation
- msgBuffer.putShort (object->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
object->writeInstrumentation (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt.inst." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->isDeleted ())
deleteList.push_back (iter->first);
-
- // Temporary protection against buffer overrun.
- // This needs to be replaced with frame fragmentation.
- if (msgBuffer.available () < THRESHOLD)
- break;
}
- msgBuffer.putOctet ('X'); // End-of-message
- msgBuffer.putOctet (0);
- msgBuffer.putShort (0);
- msgBuffer.putLong (8);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
-
- AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
- 0, "qpid.management", 0, 0));
- AMQFrame header (0, AMQHeaderBody());
- AMQFrame content;
-
- content.setBody(AMQContentBody());
- content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(contentSize);
- msg->getFrames().append(content);
-
- DeliverableMessage deliverable (msg);
- mExchange->route (deliverable, "mgmt", 0);
-
// Delete flagged objects
for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
iter != deleteList.rend ();
iter++)
- {
managementObjects.erase (*iter);
- }
+
deleteList.clear ();
}
@@ -264,10 +249,11 @@
if (contentSize < 8 || contentSize > 65536)
return;
- char *inMem = new char[contentSize];
- char outMem[4096]; // TODO Fix This
- Buffer inBuffer (inMem, contentSize);
- Buffer outBuffer (outMem, 4096);
+ char *inMem = new char[contentSize];
+ char outMem[4096]; // TODO Fix This
+ Buffer inBuffer (inMem, contentSize);
+ Buffer outBuffer (outMem, 4096);
+ uint32_t outLen;
msg.encodeContent (inBuffer);
inBuffer.reset ();
@@ -294,32 +280,9 @@
iter->second->doMethod (methodName, inBuffer, outBuffer);
}
- intrusive_ptr<Message> outMsg (new Message ());
- uint32_t msgSize = 4096 - outBuffer.available ();
+ outLen = 4096 - outBuffer.available ();
outBuffer.reset ();
- AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
- 0, "amq.direct", 0, 0));
- AMQFrame header (0, AMQHeaderBody());
- AMQFrame content;
-
- content.setBody(AMQContentBody());
- content.castBody<AMQContentBody>()->decode(outBuffer, msgSize);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- outMsg->getFrames().append(method);
- outMsg->getFrames().append(header);
-
- MessageProperties* props = outMsg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(msgSize);
- outMsg->getFrames().append(content);
-
- DeliverableMessage outDeliverable (outMsg);
- dExchange->route (outDeliverable, replyTo, 0);
-
+ SendBuffer (outBuffer, outLen, dExchange, replyTo);
free (inMem);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Nov 15 12:49:25 2007
@@ -72,6 +72,11 @@
uint64_t nextObjectId;
void PeriodicProcessing (void);
+ void EncodeHeader (qpid::framing::Buffer& buf);
+ void SendBuffer (qpid::framing::Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Thu Nov 15 12:49:25 2007
@@ -21,41 +21,17 @@
#include "Manageable.h"
#include "ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
-void ManagementObject::schemaItem (Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig,
- bool isIndex)
-{
- uint8_t flags =
- (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
-
- buf.putOctet (flags);
- buf.putOctet (typeCode);
- buf.putShortString (name);
- buf.putShortString (description);
-}
-
-void ManagementObject::schemaListBegin (Buffer& buf)
-{
- schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true);
-}
-
-void ManagementObject::schemaListEnd (Buffer& buf)
-{
- buf.putOctet (FLAG_END);
-}
-
void ManagementObject::writeTimestamps (Buffer& buf)
{
- buf.putLongLong (uint64_t (Duration (now ())));
- buf.putLongLong (createTime);
- buf.putLongLong (destroyTime);
- buf.putLongLong (objectId);
+ buf.putShortString (className);
+ buf.putLongLong (uint64_t (Duration (now ())));
+ buf.putLongLong (createTime);
+ buf.putLongLong (destroyTime);
+ buf.putLongLong (objectId);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Thu Nov 15 12:49:25 2007
@@ -31,18 +31,6 @@
namespace qpid {
namespace management {
-const uint16_t OBJECT_SYSTEM = 1;
-const uint16_t OBJECT_BROKER = 2;
-const uint16_t OBJECT_VHOST = 3;
-const uint16_t OBJECT_QUEUE = 4;
-const uint16_t OBJECT_EXCHANGE = 5;
-const uint16_t OBJECT_BINDING = 6;
-const uint16_t OBJECT_CLIENT = 7;
-const uint16_t OBJECT_SESSION = 8;
-const uint16_t OBJECT_DESTINATION = 9;
-const uint16_t OBJECT_PRODUCER = 10;
-const uint16_t OBJECT_CONSUMER = 11;
-
class Manageable;
class ManagementObject
@@ -56,48 +44,48 @@
bool instChanged;
bool deleted;
Manageable* coreObject;
+ std::string className;
- static const uint8_t TYPE_UINT8 = 1;
- static const uint8_t TYPE_UINT16 = 2;
- static const uint8_t TYPE_UINT32 = 3;
- static const uint8_t TYPE_UINT64 = 4;
- static const uint8_t TYPE_BOOL = 5;
- static const uint8_t TYPE_STRING = 6;
+ static const uint8_t TYPE_U8 = 1;
+ static const uint8_t TYPE_U16 = 2;
+ static const uint8_t TYPE_U32 = 3;
+ static const uint8_t TYPE_U64 = 4;
+ static const uint8_t TYPE_SSTR = 6;
+ static const uint8_t TYPE_LSTR = 7;
+
+ static const uint8_t ACCESS_RC = 1;
+ static const uint8_t ACCESS_RW = 1;
+ static const uint8_t ACCESS_RO = 1;
+
+ static const uint8_t DIR_I = 1;
+ static const uint8_t DIR_O = 2;
+ static const uint8_t DIR_IO = 3;
static const uint8_t FLAG_CONFIG = 0x01;
static const uint8_t FLAG_INDEX = 0x02;
static const uint8_t FLAG_END = 0x80;
-
- void schemaItem (qpid::framing::Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig = false,
- bool isIndex = false);
- void schemaListBegin (qpid::framing::Buffer& buf);
- void schemaListEnd (qpid::framing::Buffer& buf);
+
void writeTimestamps (qpid::framing::Buffer& buf);
public:
typedef boost::shared_ptr<ManagementObject> shared_ptr;
- ManagementObject (Manageable* _core) :
+ ManagementObject (Manageable* _core, std::string _name) :
destroyTime(0), objectId (0), configChanged(true),
- instChanged(true), deleted(false), coreObject(_core)
+ instChanged(true), deleted(false), coreObject(_core), className(_name)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
- virtual uint16_t getObjectType (void) = 0;
- virtual std::string getObjectName (void) = 0;
- virtual void writeSchema (qpid::framing::Buffer& buf) = 0;
- virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
- virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
- virtual bool getSchemaNeeded (void) = 0;
- virtual void setSchemaNeeded (void) = 0;
- virtual void doMethod (std::string methodName,
- qpid::framing::Buffer& inBuf,
- qpid::framing::Buffer& outBuf) = 0;
+ virtual void writeSchema (qpid::framing::Buffer& buf) = 0;
+ virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
+ virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+ virtual bool getSchemaNeeded (void) = 0;
+ virtual void setSchemaNeeded (void) = 0;
+ virtual void doMethod (std::string methodName,
+ qpid::framing::Buffer& inBuf,
+ qpid::framing::Buffer& outBuf) = 0;
+ std::string getClassName (void) { return className; }
void setObjectId (uint64_t oid) { objectId = oid; }
uint64_t getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.cpp Thu Nov 15 12:49:25 2007
@@ -20,6 +20,7 @@
*/
#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
#include "Manageable.h"
#include "Queue.h"
@@ -32,7 +33,7 @@
Queue::Queue (Manageable* _core, Manageable* _parent,
const std::string& _name,
bool _durable, bool _autoDelete) :
- ManagementObject(_core), name(_name),
+ ManagementObject(_core, "queue"), name(_name),
durable(_durable), autoDelete(_autoDelete)
{
vhostRef = _parent->GetManagementObject ()->getObjectId ();
@@ -83,47 +84,248 @@
void Queue::writeSchema (Buffer& buf)
{
+ FieldTable ft;
+
schemaNeeded = false;
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT64, "vhostRef", "Virtual Host Ref", true);
- schemaItem (buf, TYPE_STRING, "name", "Queue Name", true);
- schemaItem (buf, TYPE_BOOL, "durable", "Durable", true);
- schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true);
- schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued");
- schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued");
- schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued");
- schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued");
- schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages");
- schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval");
- schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval");
- schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued");
- schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued");
- schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued");
- schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued");
- schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
- schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
- schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes");
- schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval");
- schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started ");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed");
- schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started ");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed");
- schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval");
- schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue");
- schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval");
- schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval");
- schemaListEnd (buf);
+ // Schema class header:
+ buf.putShortString (className); // Class Name
+ buf.putShort (4); // Config Element Count
+ buf.putShort (33); // Inst Element Count
+ buf.putShort (0); // Method Count
+ buf.putShort (0); // Event Count
+
+ // Config Elements
+ ft = FieldTable ();
+ ft.setString ("name", "vhostRef");
+ ft.setInt ("type", TYPE_U64);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "Virtual Host Ref");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "name");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "Queue Name");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "durable");
+ ft.setInt ("type", TYPE_U8);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Durable");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "autoDelete");
+ ft.setInt ("type", TYPE_U8);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "AutoDelete");
+ buf.put (ft);
+
+ // Inst Elements
+ ft = FieldTable ();
+ ft.setString ("name", "msgTotalEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgTotalDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgTxnEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Transactional messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgTxnDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Transactional messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgPersistEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Persistent messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgPersistDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Persistent messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgDepth");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Current size of queue in messages");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgDepthLow");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Low-water queue size, this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "msgDepthHigh");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "High-water queue size, this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteTotalEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteTotalDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteTxnEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Transactional messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteTxnDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Transactional messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "bytePersistEnqueues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Persistent messages enqueued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "bytePersistDequeues");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Persistent messages dequeued");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteDepth");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Current size of queue in bytes");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteDepthLow");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Low-water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "byteDepthHigh");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "High-water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnStarts");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total enqueue transactions started ");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnCommits");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total enqueue transactions committed");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnRejects");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total enqueue transactions rejected");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnCount");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Current pending enqueue transactions");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnCountLow");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Low water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "enqueueTxnCountHigh");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "High water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnStarts");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total dequeue transactions started ");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnCommits");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total dequeue transactions committed");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnRejects");
+ ft.setInt ("type", TYPE_U64);
+ ft.setString ("desc", "Total dequeue transactions rejected");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnCount");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Current pending dequeue transactions");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnCountLow");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Transaction low water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "dequeueTxnCountHigh");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Transaction high water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "consumers");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Current consumers on queue");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "consumersLow");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Consumer low water mark this interval");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "consumersHigh");
+ ft.setInt ("type", TYPE_U32);
+ ft.setString ("desc", "Consumer high water mark this interval");
+ buf.put (ft);
}
void Queue::writeConfig (Buffer& buf)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Queue.h Thu Nov 15 12:49:25 2007
@@ -83,16 +83,14 @@
uint32_t consumersLow; // Low water mark this interval
uint32_t consumersHigh; // High water mark this interval
- uint16_t getObjectType (void) { return OBJECT_QUEUE; }
- std::string getObjectName (void) { return "queue"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& buf);
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
- void doMethod (std::string /*methodName*/,
- qpid::framing::Buffer& /*inBuf*/,
- qpid::framing::Buffer& /*outBuf*/) {}
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& buf);
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string /*methodName*/,
+ qpid::framing::Buffer& /*inBuf*/,
+ qpid::framing::Buffer& /*outBuf*/) {}
inline void adjustQueueHiLo (void){
if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.cpp Thu Nov 15 12:49:25 2007
@@ -21,6 +21,7 @@
#include "Manageable.h"
#include "Vhost.h"
+#include "qpid/framing/FieldTable.h"
using namespace qpid::management;
using namespace qpid::sys;
@@ -29,7 +30,7 @@
bool Vhost::schemaNeeded = true;
Vhost::Vhost (Manageable* _core, Manageable* _parent) :
- ManagementObject (_core), name("/")
+ ManagementObject (_core, "vhost"), name("/")
{
brokerRef = _parent->GetManagementObject ()->getObjectId ();
}
@@ -38,12 +39,33 @@
void Vhost::writeSchema (Buffer& buf)
{
+ FieldTable ft;
+
schemaNeeded = false;
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" , true);
- schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true);
- schemaListEnd (buf);
+ // Schema class header:
+ buf.putShortString (className); // Class Name
+ buf.putShort (2); // Config Element Count
+ buf.putShort (0); // Inst Element Count
+ buf.putShort (0); // Method Count
+ buf.putShort (0); // Event Count
+
+ // Config Elements
+ ft = FieldTable ();
+ ft.setString ("name", "brokerRef");
+ ft.setInt ("type", TYPE_U64);
+ ft.setInt ("access", ACCESS_RC);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "Broker Reference");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "name");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "Name of virtual host");
+ buf.put (ft);
}
void Vhost::writeConfig (Buffer& buf)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h?rev=595453&r1=595452&r2=595453&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/Vhost.h Thu Nov 15 12:49:25 2007
@@ -45,16 +45,14 @@
uint64_t brokerRef;
std::string name;
- uint16_t getObjectType (void) { return OBJECT_VHOST; }
- std::string getObjectName (void) { return "vhost"; }
- void writeSchema (qpid::framing::Buffer& buf);
- void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
- bool getSchemaNeeded (void) { return schemaNeeded; }
- void setSchemaNeeded (void) { schemaNeeded = true; }
- void doMethod (std::string /*methodName*/,
- qpid::framing::Buffer& /*inBuf*/,
- qpid::framing::Buffer& /*outBuf*/) {}
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string /*methodName*/,
+ qpid::framing::Buffer& /*inBuf*/,
+ qpid::framing::Buffer& /*outBuf*/) {}
inline bool getInstChanged (void) { return false; }
};