You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/23 22:21:17 UTC

svn commit: r926788 - in /qpid/branches/qmf-devel0.7a/qpid: cpp/examples/qmf-agent/ cpp/include/qpid/management/ cpp/managementgen/qmfgen/ cpp/src/qpid/agent/ extras/qmf/src/py/qmf/

Author: tross
Date: Tue Mar 23 21:21:17 2010
New Revision: 926788

URL: http://svn.apache.org/viewvc?rev=926788&view=rev
Log:
Checkpointing updates for the Python console.
  - Added 'list' type for QMF.
  - Updated qmf-agent example to use the new string formats for agent name and object-id.
  - Major updates in Python qmf.console to handle dual-mode operation.

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/schema.xml
    qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/management-types.xml
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp Tue Mar 23 21:21:17 2010
@@ -98,7 +98,7 @@ CoreClass::CoreClass(ManagementAgent* _a
     static uint64_t persistId = 0x111222333444555LL;
     mgmtObject = new _qmf::Parent(agent, this, name);
 
-    agent->addObject(mgmtObject, persistId++);
+    agent->addObject(mgmtObject);
     mgmtObject->set_state("IDLE");
 
     Variant::Map args;
@@ -109,6 +109,11 @@ CoreClass::CoreClass(ManagementAgent* _a
     subMap["numeric-data"] = 10000;
     args["map-data"] = subMap;
     mgmtObject->set_args(args);
+
+    Variant::List list;
+    list.push_back(20000);
+    list.push_back("string-item");
+    mgmtObject->set_list(list);
 }
 
 void CoreClass::doLoop()
@@ -190,7 +195,7 @@ int main_int(int argc, char** argv)
     _qmf::Package packageInit(agent);
 
     // Name the agent.
-    agent->setName("apache.org", "qmf-example");
+    agent->setName("apache.org", "qmf-example", "A");
 
     // Start the agent.  It will attempt to make a connection to the
     // management broker

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/schema.xml?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/schema.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/schema.xml Tue Mar 23 21:21:17 2010
@@ -30,6 +30,7 @@
 
     <property name="name"      type="sstr" access="RC" index="y"/>
     <property name="args"      type="map"  access="RO"/>
+    <property name="list"      type="list" access="RO"/>
 
     <statistic name="state" type="sstr"                desc="Operational state of the link"/>
     <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h Tue Mar 23 21:21:17 2010
@@ -97,6 +97,7 @@ public:
     static const uint8_t TYPE_S16       = 17;
     static const uint8_t TYPE_S32       = 18;
     static const uint8_t TYPE_S64       = 19;
+    static const uint8_t TYPE_LIST      = 21;
 
     static const uint8_t ACCESS_RC = 1;
     static const uint8_t ACCESS_RW = 2;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/management-types.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/management-types.xml?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/management-types.xml (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/management-types.xml Tue Mar 23 21:21:17 2010
@@ -41,6 +41,7 @@
 <type name="double"    base="DOUBLE"    cpp="double"        encode="@.putDouble(#)"      decode="# = @.getDouble()"    stream="#" size="8" accessor="direct" init="0."/>
 <type name="uuid"      base="UUID"      cpp="::qpid::messaging::Uuid" encode="#.encode(@)"       decode="#.decode(@)"  stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" />
 <type name="map"       base="FTABLE"    cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)"  stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/>
+<type name="list"      base="LIST"      cpp="::qpid::messaging::Variant::List" encode="#.encode(@)" decode="#.decode(@)"  stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::Variant::List()" byRef="y" unmap="::qpid::messaging::Variant::List(); assert(false); /*TBD*/"/>
 
 <type name="hilo8"   base="U8"   cpp="uint8_t"  encode="@.putOctet(#)"    decode="# = @.getOctet()"    style="wm" stream="#" size="1" accessor="counter" init="0"/>
 <type name="hilo16"  base="U16"  cpp="uint16_t" encode="@.putShort(#)"    decode="# = @.getShort()"    style="wm" stream="#" size="2" accessor="counter" init="0"/>

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Mar 23 21:21:17 2010
@@ -156,7 +156,7 @@ void ManagementAgentImpl::init(const str
 void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
                                uint16_t intervalSeconds,
                                bool useExternalThread,
-                               const std::string& _storeFile)
+                               const string& _storeFile)
 {
     interval     = intervalSeconds;
     extThread    = useExternalThread;
@@ -448,7 +448,7 @@ void ManagementAgentImpl::handleAttachRe
     }
 }
 
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
 {
     Mutex::ScopedLock lock(agentLock);
     string packageName;
@@ -468,14 +468,14 @@ void ManagementAgentImpl::handleSchemaRe
             SchemaClass& schema = cIter->second;
             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
-            std::string body;
+            string   body;
 
             encodeHeader(outBuffer, 's', sequence);
             schema.writeSchemaCall(body);
             outBuffer.putRawData(body);
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
 
             QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
         }
@@ -507,7 +507,7 @@ void ManagementAgentImpl::invokeMethodRe
         (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
         failed = true;
     } else {
-        std::string methodName;
+        string methodName;
         ObjectId objId;
         qpid::messaging::Variant::Map inArgs;
 
@@ -738,7 +738,7 @@ void ManagementAgentImpl::received(Messa
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
         if      (opcode == 'a') handleAttachResponse(inBuffer);
-        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
         else if (opcode == 'x') handleConsoleAddedIndication();
             QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
     }
@@ -754,15 +754,15 @@ void ManagementAgentImpl::encodeHeader(B
     buf.putLong (seq);
 }
 
-qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname,
-                                                                     const std::string& cname,
+qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+                                                                     const string& cname,
                                                                      const uint8_t *md5Sum)
 {
     qpid::messaging::Variant::Map map_;
 
     map_["_package_name"] = pname;
     map_["_class_name"] = cname;
-    map_["_hash_str"] = messaging::Uuid((const char*) md5Sum);
+    map_["_hash_str"] = messaging::Uuid(md5Sum);
     return map_;
 }
 
@@ -922,7 +922,10 @@ void ManagementAgentImpl::periodicProces
                 if (send_stats || send_props) {
                     ::qpid::messaging::Variant::Map map_;
                     ::qpid::messaging::Variant::Map values;
+                    ::qpid::messaging::Variant::Map oid;
 
+                    object->getObjectId().mapEncode(oid);
+                    map_["_object_id"] = oid;
                     map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
                                                            object->getClassName(),
                                                            object->getMd5Sum());
@@ -938,7 +941,7 @@ void ManagementAgentImpl::periodicProces
         }
 
         content.encode();
-        const std::string &str = m.getContent();
+        const string &str = m.getContent();
         if (str.length()) {
             ::qpid::messaging::Variant::Map  headers;
             headers["method"] = "indication";
@@ -1105,6 +1108,7 @@ void ManagementAgentImpl::ConnectionThre
 
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+    msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
     try {
         session.messageTransfer(arg::content=msg, arg::destination=exchange);
     } catch(exception& e) {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Tue Mar 23 21:21:17 2010
@@ -264,7 +264,7 @@ class ManagementAgentImpl : public Manag
     void handleAttachResponse (qpid::framing::Buffer& inBuffer);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
-    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
     void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo);
 
     void handleGetQuery       (const std::string& body, const std::string& content_type,

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=926788&r1=926787&r2=926788&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Tue Mar 23 21:21:17 2010
@@ -119,10 +119,16 @@ class Object(object):
   """ This class defines a 'proxy' object representing a real managed object on an agent.
       Actions taken on this proxy are remotely affected on the real managed object.
   """
-  def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
+  def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}):
     self._session = session
     self._broker  = broker
     self._schema  = schema
+    self._properties  = []
+    self._statistics  = []
+    if v2Map:
+      self.v2Init(v2Map, agentName)
+      return
+
     self._managed = managed
     if self._managed:
       self._currentTime = codec.read_uint64()
@@ -134,8 +140,6 @@ class Object(object):
       self._createTime  = None
       self._deleteTime  = None
       self._objectId    = None
-    self._properties  = []
-    self._statistics  = []
     if codec:
       if prop:
         notPresent = self._parsePresenceMasks(codec, schema)
@@ -156,6 +160,27 @@ class Object(object):
       for statistic in schema.getStatistics():
           self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
 
+  def v2Init(self, omap, agentName):
+    if omap.__class__ != dict:
+      raise Exception("QMFv2 object data must be a map/dict")
+    if '_values' not in omap:
+      raise Exception("QMFv2 object must have '_values' element")
+
+    values = omap['_values']
+    for prop in self._schema.getProperties():
+      if prop.name in values:
+        self._properties.append((prop, values[prop.name]))
+    for stat in self._schema.getStatistics():
+      if stat.name in values:
+        self._statistics.append((stat, values[stat.name]))
+    if '_subtypes' in omap:
+      self._subtypes = omap['_subtypes']
+    if '_object_id' in omap:
+      self._managed = True
+      self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
+    else:
+      self._managed = None
+
   def getBroker(self):
     """ Return the broker from which this object was sent """
     return self._broker
@@ -244,17 +269,17 @@ class Object(object):
     for method in self._schema.getMethods():
       if name == method.name:
         return lambda *args, **kwargs : self._invoke(name, args, kwargs)
-    for property, value in self._properties:
-      if name == property.name:
+    for prop, value in self._properties:
+      if name == prop.name:
         return value
-      if name == "_" + property.name + "_" and property.type == 10:  # Dereference references
+      if name == "_" + prop.name + "_" and prop.type == 10:  # Dereference references
         deref = self._session.getObjects(_objectId=value, _broker=self._broker)
         if len(deref) != 1:
           return None
         else:
           return deref[0]
-    for statistic, value in self._statistics:
-      if name == statistic.name:
+    for stat, value in self._statistics:
+      if name == stat.name:
         return value
     raise Exception("Type Object has no attribute '%s'" % name)
 
@@ -449,7 +474,7 @@ class Session:
     """
     self.console           = console
     self.brokers           = []
-    self.packages          = {}
+    self.schemaCache       = SchemaCache()
     self.seqMgr            = SequenceManager()
     self.cv                = Condition()
     self.syncSequenceList  = []
@@ -470,6 +495,134 @@ class Session:
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
+    """
+    ##
+    ## v2_data_queues is used to store object data received from QMFv2 agents.
+    ## It is stored here in case we need to go and query schema data from the
+    ## agent before reporting to the user.
+    ##
+    ## v2_data_queues is a map, keyed by agent address of queues of entries
+    ## The format of entries in the queue is a data map
+    ## This list must be protected by self.cv
+    ##
+    """
+    self.v2_data_queues = {}
+    self.v2_pending_queues = {}
+
+  def _getBrokerForAgentAddr(self, agent_addr):
+    broker = None
+    try:
+      self.cv.acquire()
+      key = (1, agent_addr)
+      for b in self.brokers:
+        if key in b.agents:
+          broker = b
+    finally:
+      self.cv.release()
+    return broker
+
+  def _processV2Data(self):
+    """
+    Attempt to make progress on the entries in the v2_data_queue.  If an entry has a schema
+    that is in our schema cache, process it.  Otherwise, send a request for the schema information
+    to the agent that manages the object.
+    """
+    try:
+      self.cv.acquire()
+      pop_list = []
+      for agent_addr in self.v2_data_queues:
+        entries = self.v2_data_queues[agent_addr]
+        keep_going = True
+        while keep_going and len(entries) > 0:
+          schemaId = self._getSchemaIdforV2ObjectLH(entries[0])
+          schema = self.schemaCache.getSchema(schemaId)
+          if schema:
+            broker = self._getBrokerForAgentAddr(agent_addr)
+            obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr)
+            entries.pop(0)
+
+            """
+            TODO:  This following code assumes that the data indication came unsolicited.
+                   This needs to be enhanced to handle the case of a query response.
+            """
+            if self.console:
+              self.console.objectProps(broker, obj)
+
+          else:
+            """
+            We have no schema for this data object, move the queue to the pending map and request
+            schema data from the agent
+            """
+            self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr]
+            pop_list.append(agent_addr)
+            self._v2SendSchemaRequest(agent_addr, schemaId)
+            keep_going = None
+      for agent_addr in pop_list:
+        self.v2_data_queues.pop(agent_addr)
+    finally:
+      self.cv.release()
+
+  def _addV2Data(self, agent_addr, data_map):
+    """
+    Add data-for-processing to the work queue
+    """
+    process = None
+    try:
+      self.cv.acquire()
+      if agent_addr in self.v2_pending_queues:
+        self.v2_pending_queues[agent_addr].append(data_map)
+      else:
+        if agent_addr not in self.v2_data_queues:
+          self.v2_data_queues[agent_addr] = []
+        self.v2_data_queues[agent_addr].append(data_map)
+        process = True
+    finally:
+      self.cv.release()
+
+    if process:
+      self._processV2Data()
+
+  def _removeV2Agent(self, agent):
+    """
+    Remove entries in the data queues related to a lost agent.
+    """
+    agent_name = agent.getAgentBank()
+    try:
+      self.cv.acquire()
+      if agent_name in self.v2_data_queues:
+        self.v2_data_queues.pop(agent_name)
+      if agent_name in self.v2_pending_queues:
+        self.v2_pending_queues.pop(agent_name)
+    finally:
+      self.cv.release()
+
+  def _schemaInfoFromV2Agent(self, agent_addr):
+    """
+    We have just received new schema information from an agent.  Check to see if there's
+    more work that can now be done.
+    """
+    re_process = None
+    try:
+      self.cv.acquire()
+      if agent_addr in self.v2_pending_queues:
+        self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr)
+        re_process = True
+    finally:
+      self.cv.release()
+
+    if re_process:
+      self._processV2Data()
+
+  def _getSchemaIdforV2ObjectLH(self, data):
+    """
+    Given a data map, extract the schema-identifier.
+    """
+    if data.__class__ != dict:
+      return None
+    if '_schema_id' in data:
+      return ClassKey(data['_schema_id'])
+    return None
+
   def __repr__(self):
     return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
 
@@ -489,6 +642,7 @@ class Session:
     returned from the addBroker call """
     if self.console:
       for agent in broker.getAgents():
+        self.console.removev2Agent(agent)
         self.console.delAgent(agent)
     broker._shutdown()
     self.brokers.remove(broker)
@@ -498,30 +652,19 @@ class Session:
     """ Get the list of known QMF packages """
     for broker in self.brokers:
       broker._waitForStable()
-    list = []
-    for package in self.packages:
-      list.append(package)
-    return list
+    return self.schemaCache.getPackages()
 
   def getClasses(self, packageName):
     """ Get the list of known classes within a QMF package """
     for broker in self.brokers:
       broker._waitForStable()
-    list = []
-    if packageName in self.packages:
-      for pkey in self.packages[packageName]:
-        list.append(self.packages[packageName][pkey].getKey())
-    return list
+    return self.schemaCache.getClasses(packageName)
 
   def getSchema(self, classKey):
     """ Get the schema for a QMF class """
     for broker in self.brokers:
       broker._waitForStable()
-    pname = classKey.getPackageName()
-    pkey = classKey.getPackageKey()
-    if pname in self.packages:
-      if pkey in self.packages[pname]:
-        return self.packages[pname][pkey]
+    return self.schemaCache.getSchema(classKey)
 
   def bindPackage(self, packageName):
     """ Request object updates for all table classes within a package. """
@@ -743,6 +886,7 @@ class Session:
   def _handleBrokerDisconnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
+        self.session._removeV2Agent(agent)
         self.console.delAgent(agent)
       self.console.brokerDisconnected(broker)
 
@@ -761,14 +905,7 @@ class Session:
 
   def _handlePackageInd(self, broker, codec, seq):
     pname = str(codec.read_str8())
-    notify = False
-    try:
-      self.cv.acquire()
-      if pname not in self.packages:
-        self.packages[pname] = {}
-        notify = True
-    finally:
-      self.cv.release()
+    notify = self.schemaCache.declarePackage(pname)
     if notify and self.console != None:
       self.console.newPackage(pname)
 
@@ -806,17 +943,9 @@ class Session:
   def _handleClassInd(self, broker, codec, seq):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
-    unknown = False
+    schema = self.schemaCache.getSchema(classKey)
 
-    try:
-      self.cv.acquire()
-      if classKey.getPackageName() in self.packages:
-        if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
-          unknown = True
-    finally:
-      self.cv.release()
-
-    if unknown:
+    if not schema:
       # Send a schema request for the unknown class
       broker._incOutstanding()
       sendCodec = Codec()
@@ -879,37 +1008,27 @@ class Session:
       event = Event(self, broker, codec)
       self.console.event(broker, event)
 
-  def _handleSchemaResp(self, broker, codec, seq):
+  def _handleSchemaResp(self, broker, codec, seq, agent_addr):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
     _class = SchemaClass(kind, classKey, codec, self)
-    try:
-      self.cv.acquire()
-      self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
-    finally:
-      self.cv.release()
-      
+    self.schemaCache.declareClass(classKey, _class)
     self.seqMgr._release(seq)
     broker._decOutstanding()
     if self.console != None:
       self.console.newClass(kind, classKey)
 
+    if agent_addr:
+      self._schemaInfoFromV2Agent(agent_addr)
+
   def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
     classKey = ClassKey(codec)
-    try:
-      self.cv.acquire()
-      pname = classKey.getPackageName()
-      if pname not in self.packages:
-        return
-      pkey = classKey.getPackageKey()
-      if pkey not in self.packages[pname]:
-        return
-      schema = self.packages[pname][pkey]
-    finally:
-      self.cv.release()
+    schema = self.schemaCache.getSchema(classKey)
+    if not schema:
+      return
 
     object = Object(self, broker, schema, codec, prop, stat)
-    if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+    if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
       broker._updateAgent(object)
 
     try:
@@ -949,7 +1068,15 @@ class Session:
     self._v2HandleHeartbeatInd(broker, mp, ah, content)
 
   def _v2HandleDataInd(self, broker, mp, ah, content):
-    pass
+    kind = "_data"
+    if "qmf.content" in ah:
+      kind = ah["qmf.content"]
+    agent_addr = ah["qmf.agent"]
+    if content.__class__ != list:
+      return
+    if kind == "_data":
+      for omap in content:
+        self._addV2Data(agent_addr, omap)
 
   def _v2HandleQueryRsp(self, broker, mp, ah, content):
     pass
@@ -960,6 +1087,24 @@ class Session:
   def _v2HandleException(self, broker, mp, ah, content):
     pass
 
+  def _v2SendSchemaRequest(self, agent_addr, schemaId):
+    """
+    Send a query to an agent to request details on a particular schema class.
+    IMPORTANT:  This function currently sends a QMFv1 schema-request to the address of
+                the agent.  The agent will send its response to amq.direct/<our-key>.
+                Eventually, this will be converted to a proper QMFv2 schema query.
+    """
+    broker = self._getBrokerForAgentAddr(agent_addr)
+    if not broker:
+      return
+
+    sendCodec = Codec()
+    seq = self.seqMgr._reserve(None)
+    broker._setHeader(sendCodec, 'S', seq)
+    schemaId.encode(sendCodec)
+    smsg = broker._message(sendCodec.encoded, agent_addr)
+    broker._send(smsg, "qmf.default.direct")
+
   def _handleError(self, error):
     try:
       self.cv.acquire()
@@ -1004,17 +1149,9 @@ class Session:
       inner_type_code = codec.read_uint8()
       if inner_type_code == 20:
           classKey = ClassKey(codec)
-          try:
-            self.cv.acquire()
-            pname = classKey.getPackageName()
-            if pname not in self.packages:
-              return None
-            pkey = classKey.getPackageKey()
-            if pkey not in self.packages[pname]:
-              return None
-            schema = self.packages[pname][pkey]
-          finally:
-            self.cv.release()
+          schema = self.schemaCache.getSchema(classKey)
+          if not schema:
+            return None
           data = Object(self, broker, schema, codec, True, True, False)
       else:
           data = self._decodeValue(codec, inner_type_code, broker)
@@ -1206,15 +1343,86 @@ class Session:
         return seq
     return None
 
-class Package:
-  """ """
-  def __init__(self, name):
-    self.name = name
+class SchemaCache(object):
+  """
+  The SchemaCache is a data structure that stores learned schema information.
+  """
+  def __init__(self):
+    """
+    Create a map of schema packages and a lock to protect this data structure.
+    Note that this lock is at the bottom of any lock hierarchy.  If it is held, no other
+    lock in the system should attempt to be acquired.
+    """
+    self.packages = {}
+    self.lock = Lock()
+
+  def getPackages(self):
+    """ Get the list of known QMF packages """
+    list = []
+    try:
+      self.lock.acquire()
+      for package in self.packages:
+        list.append(package)
+    finally:
+      self.lock.release()
+    return list
+
+  def getClasses(self, packageName):
+    """ Get the list of known classes within a QMF package """
+    list = []
+    try:
+      self.lock.acquire()
+      if packageName in self.packages:
+        for pkey in self.packages[packageName]:
+          list.append(self.packages[packageName][pkey].getKey())
+    finally:
+      self.lock.release()
+    return list
+
+  def getSchema(self, classKey):
+    """ Get the schema for a QMF class """
+    pname = classKey.getPackageName()
+    pkey = classKey.getPackageKey()
+    try:
+      self.lock.acquire()
+      if pname in self.packages:
+        if pkey in self.packages[pname]:
+          return self.packages[pname][pkey]
+    finally:
+      self.lock.release()
+    return None
+
+  def declarePackage(self, pname):
+    """ Maybe add a package to the cache.  Return True if package was added, None if it pre-existed. """
+    try:
+      self.lock.acquire()
+      if pname in self.packages:
+        return None
+      self.packages[pname] = {}
+    finally:
+      self.lock.release()
+    return True
+
+  def declareClass(self, classKey, classDef):
+    """ Maybe add a class definition to the cache.  Return True if added, None if pre-existed. """
+    pname = classKey.getPackageName()
+    pkey = classKey.getPackageKey()
+    try:
+      self.lock.acquire()
+      if pname not in self.packages:
+        self.packages[pname] = {}
+      packageMap = self.packages[pname]
+      if pkey in packageMap:
+        return None
+      packageMap[pkey] = classDef
+    finally:
+      self.lock.release()
+    return True
 
 class ClassKey:
   """ A ClassKey uniquely identifies a class from the schema. """
   def __init__(self, constructor):
-    if type(constructor) == str:
+    if constructor.__class__ == str:
       # construct from __repr__ string
       try:
         self.pname, cls = constructor.split(":")
@@ -1225,20 +1433,30 @@ class ClassKey:
         h1 = int(hexValues[1], 16)
         h2 = int(hexValues[2], 16)
         h3 = int(hexValues[3], 16)
-        self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
+        h4 = int(hexValues[4][0:4], 16)
+        h5 = int(hexValues[4][4:12], 16)
+        self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5))
       except:
         raise Exception("Invalid ClassKey format")
+    elif constructor.__class__ == dict:
+      # construct from QMFv2 map
+      try:
+        self.pname = constructor['_package_name']
+        self.cname = constructor['_class_name']
+        self.hash  = constructor['_hash_str']
+      except:
+        raise Exception("Invalid ClassKey map format")
     else:
       # construct from codec
       codec = constructor
       self.pname = str(codec.read_str8())
       self.cname = str(codec.read_str8())
-      self.hash  = codec.read_bin128()
+      self.hash  = UUID(codec.read_bin128())
 
   def encode(self, codec):
     codec.write_str8(self.pname)
     codec.write_str8(self.cname)
-    codec.write_bin128(self.hash)
+    codec.write_bin128(self.hash.bytes)
 
   def getPackageName(self):
     return self.pname
@@ -1250,7 +1468,7 @@ class ClassKey:
     return self.hash
 
   def getHashString(self):
-    return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
+    return str(self.hash)
 
   def getPackageKey(self):
     return (self.cname, self.hash)
@@ -1442,61 +1660,93 @@ class SchemaArgument:
 
 class ObjectId:
   """ Object that represents QMF object identifiers """
-  def __init__(self, codec, first=0, second=0):
-    if codec:
-      self.first  = codec.read_uint64()
-      self.second = codec.read_uint64()
-    else:
-      self.first = first
-      self.second = second
+  def __init__(self, constructor, first=0, second=0, agentName=None):
+    if  constructor.__class__ == dict:
+      self.agentName = agentName
+      self.agentEpoch = 0
+      if '_agent_name' in constructor:  self.agentName = constructor['_agent_name']
+      if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch']
+      if '_object_name' not in constructor:
+        raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.")
+      self.objectName = constructor['_object_name']
+    else:
+      if not constructor:
+        first = first
+        second = second
+      else:
+        first  = constructor.read_uint64()
+        second = constructor.read_uint64()
+      self.agentName = str((first & 0x0000FFFFF0000000) >> 28)
+      self.agentEpoch = (first & 0x0FFF000000000000) >> 48
+      self.objectName = str(second)
 
   def __cmp__(self, other):    
     if other == None or not isinstance(other, ObjectId) :
       return 1
-    if self.first < other.first:
+
+    if self.objectName < other.objectName:
+      return -1
+    if self.objectName > other.objectName:
+      return 1
+
+    if self.agentName < other.agentName:
       return -1
-    if self.first > other.first:
+    if self.agentName > other.agentName:
       return 1
-    if self.second < other.second:
+
+    if self.agentEpoch < other.agentEpoch:
       return -1
-    if self.second > other.second:
+    if self.agentEpoch > other.agentEpoch:
       return 1
     return 0
 
   def __repr__(self):
-    return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
+    return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
                                self.getBrokerBank(), self.getAgentBank(), self.getObject())
 
   def index(self):
-    return (self.first, self.second)
+    return self.__repr__()
 
   def getFlags(self):
-    return (self.first & 0xF000000000000000) >> 60
+    return 0
 
   def getSequence(self):
-    return (self.first & 0x0FFF000000000000) >> 48
+    return self.agentEpoch
 
   def getBrokerBank(self):
-    return (self.first & 0x0000FFFFF0000000) >> 28
+    return 1
 
   def getAgentBank(self):
-    return self.first & 0x000000000FFFFFFF
+    return self.agentName
 
   def getObject(self):
-    return self.second
+    return self.objectName
 
   def isDurable(self):
     return self.getSequence() == 0
 
   def encode(self, codec):
-    codec.write_uint64(self.first)
-    codec.write_uint64(self.second)
+    first = self.agentEpoch << 48
+    second = 0
+
+    try:
+      first += int(self.agentName) << 28
+    except:
+      pass
+
+    try:
+      second = int(self.objectName)
+    except:
+      pass
+
+    codec.write_uint64(first)
+    codec.write_uint64(second)
 
   def __hash__(self):
-    return (self.first, self.second).__hash__()
+    return self.__repr__().__hash__()
 
   def __eq__(self, other):
-    return (self.first, self.second).__eq__(other)
+    return self.__repr__().__eq__(other)
 
 class MethodResult(object):
   """ """
@@ -1590,7 +1840,6 @@ class Broker:
     self.authUser = authUser
     self.authPass = authPass
     self.cv = Condition()
-    self.agentLock = Lock()
     self.error = None
     self.brokerId = None
     self.connected = False
@@ -1625,11 +1874,11 @@ class Broker:
     """ Return the agent object associated with a particular broker and agent bank value."""
     bankKey = (brokerBank, agentBank)
     try:
-      self.agentLock.acquire()
+      self.cv.acquire()
       if bankKey in self.agents:
         return self.agents[bankKey]
     finally:
-      self.agentLock.release()
+      self.cv.release()
     return None
 
   def getSessionId(self):
@@ -1639,10 +1888,10 @@ class Broker:
   def getAgents(self):
     """ Get the list of agents reachable via this broker """
     try:
-      self.agentLock.acquire()
+      self.cv.acquire()
       return self.agents.values()
     finally:
-      self.agentLock.release()
+      self.cv.release()
 
   def getAmqpSession(self):
     """ Get the AMQP session object for this connected broker. """
@@ -1672,11 +1921,11 @@ class Broker:
   def _tryToConnect(self):
     try:
       try:
-        self.agentLock.acquire()
+        self.cv.acquire()
         self.agents = {}
         self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
       finally:
-        self.agentLock.release()
+        self.cv.release()
 
       self.topicBound = False
       self.syncInFlight = False
@@ -1768,35 +2017,35 @@ class Broker:
     agent = None
     if obj._deleteTime == 0:
       try:
-        self.agentLock.acquire()
+        self.cv.acquire()
         if bankKey not in self.agents:
           agent = Agent(self, obj.agentBank, obj.label)
           self.agents[bankKey] = agent
       finally:
-        self.agentLock.release()
+        self.cv.release()
       if agent and self.session.console:
         self.session.console.newAgent(agent)
     else:
       try:
-        self.agentLock.acquire()
+        self.cv.acquire()
         agent = self.agents.pop(bankKey, None)
       finally:
-        self.agentLock.release()
+        self.cv.release()
       if agent and self.session.console:
         self.session.console.delAgent(agent)
 
   def _addAgent(self, name, agent):
     try:
-      self.agentLock.acquire()
+      self.cv.acquire()
       self.agents[(1, name)] = agent
     finally:
-      self.agentLock.release()
+      self.cv.release()
     if self.session.console:
       self.session.console.newAgent(agent)
 
   def _ageAgents(self):
     try:
-      self.agentLock.acquire()
+      self.cv.acquire()
       to_delete = []
       to_notify = []
       for key in self.agents:
@@ -1805,12 +2054,16 @@ class Broker:
       for key in to_delete:
         to_notify.append(self.agents.pop(key, None))
     finally:
-      self.agentLock.release()
+      self.cv.release()
     if self.session.console:
       for agent in to_notify:
+        self.session._removeV2Agent(agent)
         self.session.console.delAgent(agent)
 
   def _v2SendAgentLocate(self, predicate={}):
+    """
+    Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
+    """
     dp = self.amqpSession.delivery_properties()
     dp.routing_key = "console.request.agent_locate"
     mp = self.amqpSession.message_properties()
@@ -1915,6 +2168,11 @@ class Broker:
       self.cv.release()
 
   def _replyCb(self, msg):
+    agent_addr = None
+    mp = msg.get("message_properties")
+    ah = mp.application_headers
+    if ah and 'qmf.agent' in ah:
+      agent_addr = ah['qmf.agent']
     codec = Codec(msg.body)
     while True:
       opcode, seq = self._checkHeader(codec)
@@ -1926,7 +2184,7 @@ class Broker:
       elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
       elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
       elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
-      elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq)
+      elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq, agent_addr)
       elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)
       elif opcode == 'i': self.session._handleContentInd      (self, codec, seq, stat=True)
       elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
@@ -2014,15 +2272,12 @@ class Event:
     self.classKey = ClassKey(codec)
     self.timestamp = codec.read_int64()
     self.severity = codec.read_uint8()
-    self.schema = None
-    pname = self.classKey.getPackageName()
-    pkey = self.classKey.getPackageKey()
-    if pname in session.packages:
-      if pkey in session.packages[pname]:
-        self.schema = session.packages[pname][pkey]
-        self.arguments = {}
-        for arg in self.schema.arguments:
-          self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
+    self.schema = session.schemaCache.getSchema(self.classKey)
+    if not self.schema:
+      return
+    self.arguments = {}
+    for arg in self.schema.arguments:
+      self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
 
   def __repr__(self):
     if self.schema == None:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org