You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/15 17:51:15 UTC
svn commit: r704944 - in /incubator/qpid/trunk/qpid:
cpp/managementgen/qmf/templates/ cpp/src/qpid/agent/
cpp/src/qpid/management/ python/commands/ python/qpid/
Author: tross
Date: Wed Oct 15 08:51:15 2008
New Revision: 704944
URL: http://svn.apache.org/viewvc?rev=704944&view=rev
Log:
QPID-1360 - Scaling improvements for QMF
Modified:
incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats
incubator/qpid/trunk/qpid/python/qpid/management.py
incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h Wed Oct 15 08:51:15 2008
@@ -55,11 +55,11 @@
struct PerThreadStats** perThreadStatsArray;
inline struct PerThreadStats* getThreadStats() {
- int index = getThreadIndex();
- struct PerThreadStats* threadStats = perThreadStatsArray[index];
+ int idx = getThreadIndex();
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
if (threadStats == 0) {
threadStats = new(PerThreadStats);
- perThreadStatsArray[index] = threadStats;
+ perThreadStatsArray[idx] = threadStats;
/*MGEN:Class.InitializePerThreadElements*/
}
return threadStats;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Oct 15 08:51:15 2008
@@ -606,13 +606,11 @@
moveNewObjectsLH();
- if (clientWasAdded)
- {
+ if (clientWasAdded) {
clientWasAdded = false;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
- iter++)
- {
+ iter++) {
ManagementObject* object = iter->second;
object->setAllChanged();
}
@@ -620,39 +618,64 @@
if (managementObjects.empty())
return;
-
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++)
- {
- ManagementObject* object = iter->second;
+ iter->second->setFlags(0);
- if (object->getConfigChanged() || object->isDeleted())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
+ //
+ // Process the entire object map.
+ //
+ for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second;
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->isDeleted()))
+ continue;
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
- }
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ for (ManagementObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+
+ if (object->getConfigChanged() || object->isDeleted()) {
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ }
- if (object->getInstChanged())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
+ if (object->getInstChanged()) {
+ encodeHeader(msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
- contentSize = BUFSIZE - msgBuffer.available();
+ if (msgBuffer.available() < (BUFSIZE / 2))
+ break;
+ }
+ }
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
msgBuffer.reset();
- routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName();
+ routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName();
connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
-
- if (object->isDeleted())
- deleteList.push_back(iter->first);
}
// Delete flagged objects
@@ -737,7 +760,9 @@
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
msg.setData(data);
- session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(std::exception&) {}
}
void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed Oct 15 08:51:15 2008
@@ -291,26 +291,26 @@
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::sendBuffer (Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+void ManagementBroker::sendBuffer(Buffer& buf,
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey)
{
if (exchange.get() == 0)
return;
- intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(
ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header (in_place<AMQHeaderBody>());
+ AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
content.castBody<AMQContentBody>()->decode(buf, length);
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
msg->getFrames().append(method);
msg->getFrames().append(header);
@@ -321,7 +321,9 @@
msg->getFrames().append(content);
DeliverableMessage deliverable (msg);
- exchange->route (deliverable, routingKey, 0);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(std::exception&) {}
}
void ManagementBroker::moveNewObjectsLH()
@@ -385,7 +387,7 @@
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -397,7 +399,7 @@
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -1018,7 +1020,7 @@
return;
// No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
+ QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" <<
key.name);
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Oct 15 08:51:15 2008
@@ -122,6 +122,7 @@
sys::Mutex accessLock;
ManagementAgent* agent;
int maxThreads;
+ uint32_t flags;
static int nextThreadIndex;
@@ -164,6 +165,15 @@
deleted = true;
}
inline bool isDeleted (void) { return deleted; }
+ inline void setFlags(uint32_t f) { flags = f; }
+ inline uint32_t getFlags() { return flags; }
+ bool isSameClass(ManagementObject& other) {
+ for (int idx = 0; idx < 16; idx++)
+ if (other.getMd5Sum()[idx] != getMd5Sum()[idx])
+ return false;
+ return other.getClassName() == getClassName() &&
+ other.getPackageName() == getPackageName();
+ }
};
typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats Wed Oct 15 08:51:15 2008
@@ -26,120 +26,96 @@
import socket
import qpid
from threading import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
+from qpid.qmfconsole import Session, Console
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
from qpid.util import connect
from time import sleep
-class mgmtObject (object):
- """ Generic object that holds the contents of a management object with its
- attributes set as object attributes. """
-
- def __init__ (self, classKey, timestamps, row):
- self.classKey = classKey
- self.timestamps = timestamps
- for cell in row:
- setattr (self, cell[0], cell[1])
-
-
-
-class BrokerManager:
- def __init__ (self):
- self.dest = None
- self.src = None
- self.broker = None
- self.objects = {}
- self.filter = None
-
- def SetBroker (self, broker):
- self.broker = broker
-
- def ConnectToBroker (self):
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (self.broker.host, self.broker.port),
- username=self.broker.username, password=self.broker.password)
- self.conn.start ()
- self.session = self.conn.session(self.sessionId)
- self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb)
- self.mchannel = self.mclient.addChannel (self.session)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
-
- def setFilter(self,filter):
- self.filter = filter
-
- def Disconnect (self):
- self.mclient.removeChannel (self.mchannel)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
-
- def configCb (self, context, classKey, row, timestamps):
- className = classKey[1]
- if className != "queue":
- return
-
- obj = mgmtObject (classKey, timestamps, row)
- if obj.id not in self.objects:
- self.objects[obj.id] = (obj.name, None, None)
-
- def instCb (self, context, classKey, row, timestamps):
- className = classKey[1]
- if className != "queue":
- return
-
- obj = mgmtObject (classKey, timestamps, row)
- if obj.id not in self.objects:
- return
-
- (name, first, last) = self.objects[obj.id]
- if first == None:
- self.objects[obj.id] = (name, obj, None)
- return
+class BrokerManager(Console):
+ def __init__(self, host):
+ self.url = host
+ self.objects = {}
+ self.filter = None
+ self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, userBindings=True)
+ try:
+ self.broker = self.session.addBroker(self.url)
+ except socket.error, e:
+ print "Socket Error %s - %s" % (e[0], e[1])
+ sys.exit (1)
+ except Closed, e:
+ print "Connect Failed %d - %s" % (e[0], e[1])
+ sys.exit (1)
+ except ConnectionFailed, e:
+ print "Connect Failed %d - %s" % (e[0], e[1])
+ sys.exit(1)
+
+ def setFilter(self,filter):
+ self.filter = filter
+
+ def objectProps(self, broker, record):
+ className = record.getClassKey()[1]
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ self.objects[id] = (record.name, None, None)
+
+ def objectStats(self, broker, record):
+ className = record.getClassKey()[1]
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ return
+
+ (name, first, last) = self.objects[id]
+ if first == None:
+ self.objects[id] = (name, record, None)
+ return
- if len(self.filter) > 0 :
- match = False
+ if len(self.filter) > 0 :
+ match = False
- for x in self.filter:
- if x.match(name):
- match = True
- break
- if match == False:
- return
-
- if last == None:
- lastSample = first
- else:
- lastSample = last
-
- self.objects[obj.id] = (name, first, obj)
-
- deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0])
- enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0)
- dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0)
- print "%-41s%10.2f%11d%13.2f%13.2f" % \
- (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate)
-
-
- def Display (self):
- self.ConnectToBroker ()
- print "Queue Name Sec Depth Enq Rate Deq Rate"
- print "========================================================================================"
- try:
- while True:
- sleep (1)
- except KeyboardInterrupt:
- pass
- self.Disconnect ()
+ for x in self.filter:
+ if x.match(name):
+ match = True
+ break
+ if match == False:
+ return
+
+ if last == None:
+ lastSample = first
+ else:
+ lastSample = last
+
+ self.objects[id] = (name, first, record)
+
+ deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+ enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+ (deltaTime / 1000000000.0)
+ dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+ (deltaTime / 1000000000.0)
+ print "%-41s%10.2f%11d%13.2f%13.2f" % \
+ (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+
+
+ def Display (self):
+ classes = self.session.getClasses("org.apache.qpid.broker")
+ for cls in classes:
+ if cls[1] == "queue":
+ queueClassKey = cls
+ self.session.bindClass(queueClassKey)
+ print "Queue Name Sec Depth Enq Rate Deq Rate"
+ print "========================================================================================"
+ try:
+ while True:
+ sleep (1)
+ except KeyboardInterrupt:
+ print
+ self.session.delBroker(self.broker)
##
## Main Program
@@ -157,8 +133,7 @@
for s in options.filter.split(","):
filter.append(re.compile(s))
- bm = BrokerManager ()
- bm.SetBroker (Broker (host))
+ bm = BrokerManager(host)
bm.setFilter(filter)
bm.Display()
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Oct 15 08:51:15 2008
@@ -162,8 +162,12 @@
ssn.exchange_bind (exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
- ssn.message_subscribe (queue=self.topicName, destination="tdest")
- ssn.message_subscribe (queue=self.replyName, destination="rdest")
+ ssn.message_subscribe (queue=self.topicName, destination="tdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
+ ssn.message_subscribe (queue=self.replyName, destination="rdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
ssn.incoming ("rdest").listen (self.replyCb)
@@ -202,9 +206,6 @@
if self.enabled:
self.qpidChannel.message_transfer (destination=exchange, message=msg)
- def accept (self, msg):
- self.qpidChannel.message_accept(RangedSet(msg.id))
-
def message (self, body, routing_key="broker"):
dp = self.qpidChannel.delivery_properties()
dp.routing_key = routing_key
@@ -349,28 +350,27 @@
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
codec = Codec (self.spec, msg.body)
- hdr = self.checkHeader (codec)
- if hdr == None:
- raise ValueError ("outer header invalid");
+ while True:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ return
- if hdr[0] == 'p':
- self.handlePackageInd (ch, codec)
- elif hdr[0] == 'q':
- self.handleClassInd (ch, codec)
- elif hdr[0] == 'h':
- self.handleHeartbeat (ch, codec)
- elif hdr[0] == 'e':
- self.handleEvent (ch, codec)
- else:
- self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
+ if hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ elif hdr[0] == 'h':
+ self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
- ch.accept(msg)
return
if hdr[0] == 'm':
@@ -385,7 +385,6 @@
self.handleClassInd (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
def exceptCb (self, ch, data):
if self.closeCb != None:
@@ -403,20 +402,22 @@
codec.write_uint32 (seq)
def checkHeader (self, codec):
- """ Check the header of a management message and extract the opcode and
- class. """
- octet = chr (codec.read_uint8 ())
- if octet != 'A':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != 'M':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != '2':
+ """ Check the header of a management message and extract the opcode and class. """
+ try:
+ octet = chr (codec.read_uint8 ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != '2':
+ return None
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
+ return (opcode, seq)
+ except:
return None
- opcode = chr (codec.read_uint8 ())
- seq = codec.read_uint32 ()
- return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Wed Oct 15 08:51:15 2008
@@ -62,11 +62,11 @@
""" Invoked when a QMF agent disconects. """
pass
- def objectProps(self, broker, id, record):
+ def objectProps(self, broker, record):
""" Invoked when an object is updated. """
pass
- def objectStats(self, broker, id, record):
+ def objectStats(self, broker, record):
""" Invoked when an object is updated. """
pass
@@ -111,10 +111,10 @@
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 10
+ GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
- manageConnections=False):
+ manageConnections=False, userBindings=False):
"""
Initialize a session. If the console argument is provided, the
more advanced asynchronous features are available. If console is
@@ -131,6 +131,12 @@
If manageConnections is set to False, the user is responsible for handing failures. In
this case, an unreachable broker will cause addBroker to raise an exception.
+
+ If userBindings is set to False (the default) and rcvObjects is True, the console will
+ receive data for all object classes. If userBindings is set to True, the user must select
+ which classes the console shall receive by invoking the bindPackage or bindClass methods.
+ This allows the console to be configured to receive only information that is relavant to
+ a particular application. If rcvObjects id False, userBindings has no meaning.
"""
self.console = console
self.brokers = []
@@ -141,14 +147,21 @@
self.getResult = []
self.getSelect = []
self.error = None
+ self.rcvObjects = rcvObjects
+ self.rcvEvents = rcvEvents
+ self.rcvHeartbeats = rcvHeartbeats
+ self.userBindings = userBindings
if self.console == None:
- rcvObjects = False
- rcvEvents = False
- rcvHeartbeats = False
- self.bindingKeyList = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats)
+ self.rcvObjects = False
+ self.rcvEvents = False
+ self.rcvHeartbeats = False
+ self.bindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
- if (manageConnections):
+ if self.userBindings and not self.rcvObjects:
+ raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
+ if manageConnections:
raise Exception("manageConnections - not yet implemented")
def __repr__(self):
@@ -200,6 +213,23 @@
if (cname, hash) in self.packages[pname]:
return self.packages[pname][(cname, hash)]
+ def bindPackage(self, packageName):
+ """ """
+ if not self.userBindings or not self.rcvObjects:
+ raise Exception("userBindings option not set for Session")
+ for broker in self.brokers:
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key="console.obj.%s" % packageName)
+
+ def bindClass(self, classKey):
+ """ """
+ if not self.userBindings or not self.rcvObjects:
+ raise Exception("userBindings option not set for Session")
+ pname, cname, hash = classKey
+ for broker in self.brokers:
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key="console.obj.%s.%s" % (pname, cname))
+
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
@@ -322,20 +352,19 @@
""" """
pass
- def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats):
+ def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
- if rcvObjects and rcvEvents and rcvHeartbeats:
+ if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
keyList.append("console.#")
else:
- if rcvObjects:
- keyList.append("console.prop.#")
- keyList.append("console.stat.#")
+ if self.rcvObjects and not self.userBindings:
+ keyList.append("console.obj.#")
else:
- keyList.append("console.prop.org.apache.qpid.broker.agent")
- if rcvEvents:
+ keyList.append("console.obj.org.apache.qpid.broker.agent")
+ if self.rcvEvents:
keyList.append("console.event.#")
- if rcvHeartbeats:
+ if self.rcvHeartbeats:
keyList.append("console.heartbeat")
return keyList
@@ -488,9 +517,9 @@
if self.console != None:
if prop:
- self.console.objectProps(broker, object.getObjectId(), object)
+ self.console.objectProps(broker, object)
if stat:
- self.console.objectStats(broker, object.getObjectId(), object)
+ self.console.objectStats(broker, object)
def _handleError(self, error):
self.error = error
@@ -858,6 +887,15 @@
def getStatistics(self):
return self._statistics
+ def mergeUpdate(self, newer):
+ """ Replace properties and/or statistics with a newly received update """
+ if self._objectId != newer._objectId:
+ raise Exception("Objects with different object-ids")
+ if len(newer.getProperties()) > 0:
+ self.properties = newer.getProperties()
+ if len(newer.getStatistics()) > 0:
+ self.statistics = newer.getStatistics()
+
def __repr__(self):
return self.getIndex()
@@ -960,7 +998,7 @@
class Broker:
""" """
- SYNC_TIME = 10
+ SYNC_TIME = 60
def __init__(self, session, host, port, authMech, authUser, authPass):
self.session = session
@@ -1024,7 +1062,9 @@
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
- self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest")
+ self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF)
@@ -1032,7 +1072,9 @@
self.topicName = "topic-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
- self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest")
+ self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._replyCb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
@@ -1076,18 +1118,21 @@
def _checkHeader(self, codec):
""" Check the header of a management message and extract the opcode and class. """
- octet = chr(codec.read_uint8())
- if octet != 'A':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != 'M':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != '2':
+ try:
+ octet = chr(codec.read_uint8())
+ if octet != 'A':
+ return None, None
+ octet = chr(codec.read_uint8())
+ if octet != 'M':
+ return None, None
+ octet = chr(codec.read_uint8())
+ if octet != '2':
+ return None, None
+ opcode = chr(codec.read_uint8())
+ seq = codec.read_uint32()
+ return opcode, seq
+ except:
return None, None
- opcode = chr(codec.read_uint8())
- seq = codec.read_uint32()
- return opcode, seq
def _message (self, body, routing_key="broker"):
dp = self.amqpSession.delivery_properties()
@@ -1143,23 +1188,21 @@
self.cv.release()
def _replyCb(self, msg):
- self.amqpSession.message_accept(RangedSet(msg.id))
codec = Codec(self.conn.spec, msg.body)
- opcode, seq = self._checkHeader(codec)
- if opcode == None:
- return
-
- if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
- elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
- elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
+ while True:
+ opcode, seq = self._checkHeader(codec)
+ if opcode == None: return
+ if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
+ elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+ elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
+ elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
+ elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq)
+ elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
+ elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
+ elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
+ elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
self.isConnected = False
@@ -1286,10 +1329,10 @@
def delAgent(self, agent):
print "delAgent:", agent
- def objectProps(self, broker, id, record):
+ def objectProps(self, broker, record):
print "objectProps:", record.getClassKey()
- def objectStats(self, broker, id, record):
+ def objectStats(self, broker, record):
print "objectStats:", record.getClassKey()
def event(self, broker, event):