You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/15 17:51:15 UTC

svn commit: r704944 - in /incubator/qpid/trunk/qpid: cpp/managementgen/qmf/templates/ cpp/src/qpid/agent/ cpp/src/qpid/management/ python/commands/ python/qpid/

Author: tross
Date: Wed Oct 15 08:51:15 2008
New Revision: 704944

URL: http://svn.apache.org/viewvc?rev=704944&view=rev
Log:
QPID-1360 - Scaling improvements for QMF

Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/qmf/templates/Class.h Wed Oct 15 08:51:15 2008
@@ -55,11 +55,11 @@
     struct PerThreadStats** perThreadStatsArray;
 
     inline struct PerThreadStats* getThreadStats() {
-        int index = getThreadIndex();
-        struct PerThreadStats* threadStats = perThreadStatsArray[index];
+        int idx = getThreadIndex();
+        struct PerThreadStats* threadStats = perThreadStatsArray[idx];
         if (threadStats == 0) {
             threadStats = new(PerThreadStats);
-            perThreadStatsArray[index] = threadStats;
+            perThreadStatsArray[idx] = threadStats;
 /*MGEN:Class.InitializePerThreadElements*/
         }
         return threadStats;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Oct 15 08:51:15 2008
@@ -606,13 +606,11 @@
 
     moveNewObjectsLH();
 
-    if (clientWasAdded)
-    {
+    if (clientWasAdded) {
         clientWasAdded = false;
         for (ManagementObjectMap::iterator iter = managementObjects.begin();
              iter != managementObjects.end();
-             iter++)
-        {
+             iter++) {
             ManagementObject* object = iter->second;
             object->setAllChanged();
         }
@@ -620,39 +618,64 @@
 
     if (managementObjects.empty())
         return;
-        
+
+    //
+    //  Clear the been-here flag on all objects in the map.
+    //
     for (ManagementObjectMap::iterator iter = managementObjects.begin();
          iter != managementObjects.end();
          iter++)
-    {
-        ManagementObject* object = iter->second;
+        iter->second->setFlags(0);
 
-        if (object->getConfigChanged() || object->isDeleted())
-        {
-            Buffer msgBuffer(msgChars, BUFSIZE);
-            encodeHeader(msgBuffer, 'c');
-            object->writeProperties(msgBuffer);
+    //
+    //  Process the entire object map.
+    //
+    for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+         baseIter != managementObjects.end();
+         baseIter++) {
+        ManagementObject* baseObject = baseIter->second;
+
+        //
+        //  Skip until we find a base object requiring a sent message.
+        //
+        if (baseObject->getFlags() == 1 ||
+            (!baseObject->getConfigChanged() &&
+             !baseObject->getInstChanged() &&
+             !baseObject->isDeleted()))
+            continue;
 
-            contentSize = BUFSIZE - msgBuffer.available();
-            msgBuffer.reset();
-            routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName();
-            connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
-        }
+        Buffer msgBuffer(msgChars, BUFSIZE);
+        for (ManagementObjectMap::iterator iter = baseIter;
+             iter != managementObjects.end();
+             iter++) {
+            ManagementObject* object = iter->second;
+            if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+                object->setFlags(1);
+
+                if (object->getConfigChanged() || object->isDeleted()) {
+                    encodeHeader(msgBuffer, 'c');
+                    object->writeProperties(msgBuffer);
+                }
         
-        if (object->getInstChanged())
-        {
-            Buffer msgBuffer(msgChars, BUFSIZE);
-            encodeHeader(msgBuffer, 'i');
-            object->writeStatistics(msgBuffer);
+                if (object->getInstChanged()) {
+                    encodeHeader(msgBuffer, 'i');
+                    object->writeStatistics(msgBuffer);
+                }
+
+                if (object->isDeleted())
+                    deleteList.push_back(iter->first);
 
-            contentSize = BUFSIZE - msgBuffer.available();
+                if (msgBuffer.available() < (BUFSIZE / 2))
+                    break;
+            }
+        }
+
+        contentSize = BUFSIZE - msgBuffer.available();
+        if (contentSize > 0) {
             msgBuffer.reset();
-            routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName();
+            routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName();
             connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
         }
-
-        if (object->isDeleted())
-            deleteList.push_back(iter->first);
     }
 
     // Delete flagged objects
@@ -737,7 +760,9 @@
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
     msg.setData(data);
-    session.messageTransfer(arg::content=msg, arg::destination=exchange);
+    try {
+        session.messageTransfer(arg::content=msg, arg::destination=exchange);
+    } catch(std::exception&) {}
 }
 
 void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed Oct 15 08:51:15 2008
@@ -291,26 +291,26 @@
     return h1 == 'A' && h2 == 'M' && h3 == '2';
 }
 
-void ManagementBroker::sendBuffer (Buffer&  buf,
-                                   uint32_t length,
-                                   qpid::broker::Exchange::shared_ptr exchange,
-                                   string   routingKey)
+void ManagementBroker::sendBuffer(Buffer&  buf,
+                                  uint32_t length,
+                                  qpid::broker::Exchange::shared_ptr exchange,
+                                  string   routingKey)
 {
     if (exchange.get() == 0)
         return;
 
-    intrusive_ptr<Message> msg (new Message ());
-    AMQFrame method (in_place<MessageTransferBody>(
+    intrusive_ptr<Message> msg(new Message());
+    AMQFrame method(in_place<MessageTransferBody>(
         ProtocolVersion(), exchange->getName (), 0, 0));
-    AMQFrame header (in_place<AMQHeaderBody>());
+    AMQFrame header(in_place<AMQHeaderBody>());
     AMQFrame content(in_place<AMQContentBody>());
 
     content.castBody<AMQContentBody>()->decode(buf, length);
 
-    method.setEof  (false);
-    header.setBof  (false);
-    header.setEof  (false);
-    content.setBof (false);
+    method.setEof(false);
+    header.setBof(false);
+    header.setEof(false);
+    content.setBof(false);
 
     msg->getFrames().append(method);
     msg->getFrames().append(header);
@@ -321,7 +321,9 @@
     msg->getFrames().append(content);
 
     DeliverableMessage deliverable (msg);
-    exchange->route (deliverable, routingKey, 0);
+    try {
+        exchange->route(deliverable, routingKey, 0);
+    } catch(std::exception&) {}
 }
 
 void ManagementBroker::moveNewObjectsLH()
@@ -385,7 +387,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName ();
+            routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
         
@@ -397,7 +399,7 @@
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName ();
+            routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
             sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 
@@ -1018,7 +1020,7 @@
         return;
 
     // No such class found, create a new class with local information.
-    QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
+    QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" <<
               key.name);
 
     cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Oct 15 08:51:15 2008
@@ -122,6 +122,7 @@
     sys::Mutex       accessLock;
     ManagementAgent* agent;
     int              maxThreads;
+    uint32_t         flags;
 
     static int nextThreadIndex;
         
@@ -164,6 +165,15 @@
         deleted     = true;
     }
     inline bool isDeleted (void) { return deleted; }
+    inline void setFlags(uint32_t f) { flags = f; }
+    inline uint32_t getFlags() { return flags; }
+    bool isSameClass(ManagementObject& other) {
+        for (int idx = 0; idx < 16; idx++)
+            if (other.getMd5Sum()[idx] != getMd5Sum()[idx])
+                return false;
+        return other.getClassName() == getClassName() &&
+            other.getPackageName() == getPackageName();
+    }
 };
 
 typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-queue-stats Wed Oct 15 08:51:15 2008
@@ -26,120 +26,96 @@
 import socket
 import qpid
 from threading       import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
+from qpid.qmfconsole import Session, Console
 from qpid.peer       import Closed
 from qpid.connection import Connection, ConnectionFailed
 from qpid.util       import connect
 from time            import sleep
 
-class mgmtObject (object):
-  """ Generic object that holds the contents of a management object with its
-      attributes set as object attributes. """
-
-  def __init__ (self, classKey, timestamps, row):
-    self.classKey   = classKey
-    self.timestamps = timestamps
-    for cell in row:
-      setattr (self, cell[0], cell[1])
-
-
-
-class BrokerManager:
-    def __init__ (self):
-        self.dest    = None
-        self.src     = None
-        self.broker  = None
-        self.objects = {}
-        self.filter  = None   
-
-    def SetBroker (self, broker):
-        self.broker = broker
-
-    def ConnectToBroker (self):
-        try:
-            self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
-            self.conn     = Connection (connect (self.broker.host, self.broker.port),
-                                    username=self.broker.username, password=self.broker.password)
-            self.conn.start ()
-            self.session  = self.conn.session(self.sessionId)
-            self.mclient  = managementClient (self.conn.spec, None, self.configCb, self.instCb)
-            self.mchannel = self.mclient.addChannel (self.session)
-        except socket.error, e:
-            print "Socket Error %s - %s" % (e[0], e[1])
-            sys.exit (1)
-        except Closed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit (1)
-        except ConnectionFailed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit(1)
-
-    def setFilter(self,filter):
-        self.filter = filter
-
-    def Disconnect (self):
-        self.mclient.removeChannel (self.mchannel)
-        self.session.close(timeout=10)
-        self.conn.close(timeout=10)
-
-    def configCb (self, context, classKey, row, timestamps):
-        className = classKey[1]
-        if className != "queue":
-            return
-
-        obj = mgmtObject (classKey, timestamps, row)
-        if obj.id not in self.objects:
-            self.objects[obj.id] = (obj.name, None, None)
-
-    def instCb (self, context, classKey, row, timestamps):
-        className = classKey[1]
-        if className != "queue":
-            return
-
-        obj = mgmtObject (classKey, timestamps, row)
-        if obj.id not in self.objects:
-            return
-
-        (name, first, last) = self.objects[obj.id]
-        if first == None:
-            self.objects[obj.id] = (name, obj, None)
-            return
+class BrokerManager(Console):
+  def __init__(self, host):
+    self.url = host
+    self.objects = {}
+    self.filter  = None
+    self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, userBindings=True)
+    try:
+      self.broker  = self.session.addBroker(self.url)
+    except socket.error, e:
+      print "Socket Error %s - %s" % (e[0], e[1])
+      sys.exit (1)
+    except Closed, e:
+      print "Connect Failed %d - %s" % (e[0], e[1])
+      sys.exit (1)
+    except ConnectionFailed, e:
+      print "Connect Failed %d - %s" % (e[0], e[1])
+      sys.exit(1)
+
+  def setFilter(self,filter):
+    self.filter = filter
+
+  def objectProps(self, broker, record):
+    className = record.getClassKey()[1]
+    if className != "queue":
+      return
+
+    id = record.getObjectId().__repr__()
+    if id not in self.objects:
+      self.objects[id] = (record.name, None, None)
+
+  def objectStats(self, broker, record):
+    className = record.getClassKey()[1]
+    if className != "queue":
+      return
+
+    id = record.getObjectId().__repr__()
+    if id not in self.objects:
+      return
+
+    (name, first, last) = self.objects[id]
+    if first == None:
+      self.objects[id] = (name, record, None)
+      return
 
-        if len(self.filter) > 0 :
-           match = False
+    if len(self.filter) > 0 :
+      match = False
                     
-           for x in self.filter:
-              if x.match(name):                 
-                 match = True
-                 break
-           if match == False:
-              return
-
-        if last == None:
-            lastSample = first
-        else:
-            lastSample = last
-
-        self.objects[obj.id] = (name, first, obj)
-
-        deltaTime   = float (obj.timestamps[0] - lastSample.timestamps[0])
-        enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0)
-        dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0)
-        print "%-41s%10.2f%11d%13.2f%13.2f" % \
-            (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate)
-
-
-    def Display (self):
-        self.ConnectToBroker ()
-        print "Queue Name                                     Sec       Depth     Enq Rate     Deq Rate"
-        print "========================================================================================"
-        try:
-            while True:
-                sleep (1)
-        except KeyboardInterrupt:
-            pass
-        self.Disconnect ()
+      for x in self.filter:
+        if x.match(name):                 
+          match = True
+          break
+        if match == False:
+          return
+
+    if last == None:
+      lastSample = first
+    else:
+      lastSample = last
+
+    self.objects[id] = (name, first, record)
+
+    deltaTime   = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+    enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+        (deltaTime / 1000000000.0)
+    dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+        (deltaTime / 1000000000.0)
+    print "%-41s%10.2f%11d%13.2f%13.2f" % \
+        (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+
+
+  def Display (self):
+    classes = self.session.getClasses("org.apache.qpid.broker")
+    for cls in classes:
+      if cls[1] == "queue":
+        queueClassKey = cls
+    self.session.bindClass(queueClassKey)
+    print "Queue Name                                     Sec       Depth     Enq Rate     Deq Rate"
+    print "========================================================================================"
+    try:
+      while True:
+        sleep (1)
+    except KeyboardInterrupt:
+      print
+    self.session.delBroker(self.broker)
 
 ##
 ## Main Program
@@ -157,8 +133,7 @@
     for s in options.filter.split(","):
         filter.append(re.compile(s))
 
-  bm  = BrokerManager ()
-  bm.SetBroker (Broker (host))
+  bm = BrokerManager(host)
   bm.setFilter(filter)
   bm.Display()
  

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Oct 15 08:51:15 2008
@@ -162,8 +162,12 @@
 
     ssn.exchange_bind (exchange="amq.direct",
                        queue=self.replyName, binding_key=self.replyName)
-    ssn.message_subscribe (queue=self.topicName, destination="tdest")
-    ssn.message_subscribe (queue=self.replyName, destination="rdest")
+    ssn.message_subscribe (queue=self.topicName, destination="tdest",
+                           accept_mode=ssn.accept_mode.none,
+                           acquire_mode=ssn.acquire_mode.pre_acquired)
+    ssn.message_subscribe (queue=self.replyName, destination="rdest",
+                           accept_mode=ssn.accept_mode.none,
+                           acquire_mode=ssn.acquire_mode.pre_acquired)
 
     ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
     ssn.incoming ("rdest").listen (self.replyCb)
@@ -202,9 +206,6 @@
     if self.enabled:
       self.qpidChannel.message_transfer (destination=exchange, message=msg)
 
-  def accept (self, msg):
-    self.qpidChannel.message_accept(RangedSet(msg.id))
-
   def message (self, body, routing_key="broker"):
     dp = self.qpidChannel.delivery_properties()
     dp.routing_key  = routing_key
@@ -349,28 +350,27 @@
   def topicCb (self, ch, msg):
     """ Receive messages via the topic queue of a particular channel. """
     codec = Codec (self.spec, msg.body)
-    hdr   = self.checkHeader (codec)
-    if hdr == None:
-      raise ValueError ("outer header invalid");
+    while True:
+      hdr = self.checkHeader (codec)
+      if hdr == None:
+        return
 
-    if hdr[0] == 'p':
-      self.handlePackageInd (ch, codec)
-    elif hdr[0] == 'q':
-      self.handleClassInd (ch, codec)
-    elif hdr[0] == 'h':
-      self.handleHeartbeat (ch, codec)
-    elif hdr[0] == 'e':
-      self.handleEvent (ch, codec)
-    else:
-      self.parse (ch, codec, hdr[0], hdr[1])
-    ch.accept(msg)
+      if hdr[0] == 'p':
+        self.handlePackageInd (ch, codec)
+      elif hdr[0] == 'q':
+        self.handleClassInd (ch, codec)
+      elif hdr[0] == 'h':
+        self.handleHeartbeat (ch, codec)
+      elif hdr[0] == 'e':
+        self.handleEvent (ch, codec)
+      else:
+        self.parse (ch, codec, hdr[0], hdr[1])
 
   def replyCb (self, ch, msg):
     """ Receive messages via the reply queue of a particular channel. """
     codec = Codec (self.spec, msg.body)
     hdr   = self.checkHeader (codec)
     if hdr == None:
-      ch.accept(msg)
       return
 
     if   hdr[0] == 'm':
@@ -385,7 +385,6 @@
       self.handleClassInd (ch, codec)
     else:
       self.parse (ch, codec, hdr[0], hdr[1])
-    ch.accept(msg)
 
   def exceptCb (self, ch, data):
     if self.closeCb != None:
@@ -403,20 +402,22 @@
     codec.write_uint32  (seq)
 
   def checkHeader (self, codec):
-    """ Check the header of a management message and extract the opcode and
-    class. """
-    octet = chr (codec.read_uint8 ())
-    if octet != 'A':
-      return None
-    octet = chr (codec.read_uint8 ())
-    if octet != 'M':
-      return None
-    octet = chr (codec.read_uint8 ())
-    if octet != '2':
+    """ Check the header of a management message and extract the opcode and class. """
+    try:
+      octet = chr (codec.read_uint8 ())
+      if octet != 'A':
+        return None
+      octet = chr (codec.read_uint8 ())
+      if octet != 'M':
+        return None
+      octet = chr (codec.read_uint8 ())
+      if octet != '2':
+        return None
+      opcode = chr (codec.read_uint8 ())
+      seq    = codec.read_uint32 ()
+      return (opcode, seq)
+    except:
       return None
-    opcode = chr (codec.read_uint8 ())
-    seq    = codec.read_uint32 ()
-    return (opcode, seq)
 
   def encodeValue (self, codec, value, typecode):
     """ Encode, into the codec, a value based on its typecode. """

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=704944&r1=704943&r2=704944&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Wed Oct 15 08:51:15 2008
@@ -62,11 +62,11 @@
     """ Invoked when a QMF agent disconects. """
     pass
 
-  def objectProps(self, broker, id, record):
+  def objectProps(self, broker, record):
     """ Invoked when an object is updated. """
     pass
 
-  def objectStats(self, broker, id, record):
+  def objectStats(self, broker, record):
     """ Invoked when an object is updated. """
     pass
 
@@ -111,10 +111,10 @@
   _CONTEXT_STARTUP  = 2
   _CONTEXT_MULTIGET = 3
 
-  GET_WAIT_TIME = 10
+  GET_WAIT_TIME = 60
 
   def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
-               manageConnections=False):
+               manageConnections=False, userBindings=False):
     """
     Initialize a session.  If the console argument is provided, the
     more advanced asynchronous features are available.  If console is
@@ -131,6 +131,12 @@
 
     If manageConnections is set to False, the user is responsible for handing failures.  In
     this case, an unreachable broker will cause addBroker to raise an exception.
+
+    If userBindings is set to False (the default) and rcvObjects is True, the console will
+    receive data for all object classes.  If userBindings is set to True, the user must select
+    which classes the console shall receive by invoking the bindPackage or bindClass methods.
+    This allows the console to be configured to receive only information that is relavant to
+    a particular application.  If rcvObjects id False, userBindings has no meaning.
     """
     self.console           = console
     self.brokers           = []
@@ -141,14 +147,21 @@
     self.getResult         = []
     self.getSelect         = []
     self.error             = None
+    self.rcvObjects        = rcvObjects
+    self.rcvEvents         = rcvEvents
+    self.rcvHeartbeats     = rcvHeartbeats
+    self.userBindings      = userBindings
     if self.console == None:
-      rcvObjects    = False
-      rcvEvents     = False
-      rcvHeartbeats = False
-    self.bindingKeyList    = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats)
+      self.rcvObjects    = False
+      self.rcvEvents     = False
+      self.rcvHeartbeats = False
+    self.bindingKeyList    = self._bindingKeys()
     self.manageConnections = manageConnections
 
-    if (manageConnections):
+    if self.userBindings and not self.rcvObjects:
+      raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
+    if manageConnections:
       raise Exception("manageConnections - not yet implemented")
 
   def __repr__(self):
@@ -200,6 +213,23 @@
       if (cname, hash) in self.packages[pname]:
         return self.packages[pname][(cname, hash)]
 
+  def bindPackage(self, packageName):
+    """ """
+    if not self.userBindings or not self.rcvObjects:
+      raise Exception("userBindings option not set for Session")
+    for broker in self.brokers:
+      broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+                                       binding_key="console.obj.%s" % packageName)
+
+  def bindClass(self, classKey):
+    """ """
+    if not self.userBindings or not self.rcvObjects:
+      raise Exception("userBindings option not set for Session")
+    pname, cname, hash = classKey
+    for broker in self.brokers:
+      broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+                                       binding_key="console.obj.%s.%s" % (pname, cname))
+
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
     brokerList = []
@@ -322,20 +352,19 @@
     """ """
     pass
 
-  def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats):
+  def _bindingKeys(self):
     keyList = []
     keyList.append("schema.#")
-    if rcvObjects and rcvEvents and rcvHeartbeats:
+    if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
       keyList.append("console.#")
     else:
-      if rcvObjects:
-        keyList.append("console.prop.#")
-        keyList.append("console.stat.#")
+      if self.rcvObjects and not self.userBindings:
+        keyList.append("console.obj.#")
       else:
-        keyList.append("console.prop.org.apache.qpid.broker.agent")
-      if rcvEvents:
+        keyList.append("console.obj.org.apache.qpid.broker.agent")
+      if self.rcvEvents:
         keyList.append("console.event.#")
-      if rcvHeartbeats:
+      if self.rcvHeartbeats:
         keyList.append("console.heartbeat")
     return keyList
 
@@ -488,9 +517,9 @@
 
     if self.console != None:
       if prop:
-        self.console.objectProps(broker, object.getObjectId(), object)
+        self.console.objectProps(broker, object)
       if stat:
-        self.console.objectStats(broker, object.getObjectId(), object)
+        self.console.objectStats(broker, object)
 
   def _handleError(self, error):
     self.error = error
@@ -858,6 +887,15 @@
   def getStatistics(self):
     return self._statistics
 
+  def mergeUpdate(self, newer):
+    """ Replace properties and/or statistics with a newly received update """
+    if self._objectId != newer._objectId:
+      raise Exception("Objects with different object-ids")
+    if len(newer.getProperties()) > 0:
+      self.properties = newer.getProperties()
+    if len(newer.getStatistics()) > 0:
+      self.statistics = newer.getStatistics()
+
   def __repr__(self):
     return self.getIndex()
 
@@ -960,7 +998,7 @@
 
 class Broker:
   """ """
-  SYNC_TIME = 10
+  SYNC_TIME = 60
 
   def __init__(self, session, host, port, authMech, authUser, authPass):
     self.session  = session
@@ -1024,7 +1062,9 @@
       self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
       self.amqpSession.exchange_bind(exchange="amq.direct",
                                      queue=self.replyName, binding_key=self.replyName)
-      self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest")
+      self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
+                                         accept_mode=self.amqpSession.accept_mode.none,
+                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
       self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
       self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
       self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF)
@@ -1032,7 +1072,9 @@
 
       self.topicName = "topic-%s" % self.amqpSessionId
       self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
-      self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest")
+      self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
+                                         accept_mode=self.amqpSession.accept_mode.none,
+                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
       self.amqpSession.incoming("tdest").listen(self._replyCb)
       self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
       self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
@@ -1076,18 +1118,21 @@
 
   def _checkHeader(self, codec):
     """ Check the header of a management message and extract the opcode and class. """
-    octet = chr(codec.read_uint8())
-    if octet != 'A':
-      return None, None
-    octet = chr(codec.read_uint8())
-    if octet != 'M':
-      return None, None
-    octet = chr(codec.read_uint8())
-    if octet != '2':
+    try:
+      octet = chr(codec.read_uint8())
+      if octet != 'A':
+        return None, None
+      octet = chr(codec.read_uint8())
+      if octet != 'M':
+        return None, None
+      octet = chr(codec.read_uint8())
+      if octet != '2':
+        return None, None
+      opcode = chr(codec.read_uint8())
+      seq    = codec.read_uint32()
+      return opcode, seq
+    except:
       return None, None
-    opcode = chr(codec.read_uint8())
-    seq    = codec.read_uint32()
-    return opcode, seq
 
   def _message (self, body, routing_key="broker"):
     dp = self.amqpSession.delivery_properties()
@@ -1143,23 +1188,21 @@
     self.cv.release()
 
   def _replyCb(self, msg):
-    self.amqpSession.message_accept(RangedSet(msg.id))
     codec = Codec(self.conn.spec, msg.body)
-    opcode, seq = self._checkHeader(codec)
-    if opcode == None:
-      return
-
-    if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
-    elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
-    elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
-    elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
-    elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
-    elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq)
-    elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
-    elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq)
-    elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)
-    elif opcode == 'i': self.session._handleContentInd      (self, codec, seq, stat=True)
-    elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
+    while True:
+      opcode, seq = self._checkHeader(codec)
+      if   opcode == None: return
+      if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
+      elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
+      elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+      elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
+      elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
+      elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq)
+      elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
+      elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq)
+      elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)
+      elif opcode == 'i': self.session._handleContentInd      (self, codec, seq, stat=True)
+      elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
 
   def _exceptionCb(self, data):
     self.isConnected = False
@@ -1286,10 +1329,10 @@
   def delAgent(self, agent):
     print "delAgent:", agent
 
-  def objectProps(self, broker, id, record):
+  def objectProps(self, broker, record):
     print "objectProps:", record.getClassKey()
 
-  def objectStats(self, broker, id, record):
+  def objectStats(self, broker, record):
     print "objectStats:", record.getClassKey()
 
   def event(self, broker, event):