You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/19 16:14:03 UTC

svn commit: r925260 - in /qpid/branches/qmf-devel0.7a/qpid: cpp/src/qpid/agent/ManagementAgentImpl.cpp cpp/src/qpid/agent/ManagementAgentImpl.h extras/qmf/src/py/qmf/console.py

Author: tross
Date: Fri Mar 19 15:14:02 2010
New Revision: 925260

URL: http://svn.apache.org/viewvc?rev=925260&view=rev
Log:
Added functioning agent discovery, heartbeat, and age-out to C++ agent and Python console.

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=925260&r1=925259&r2=925260&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Mar 19 15:14:02 2010
@@ -242,7 +242,7 @@ void ManagementAgentImpl::raiseEvent(con
     headers["qmf.agent"] = name_address;
 
     content.encode();
-    connThreadBody.sendBuffer(msg.getContent(), 0,
+    connThreadBody.sendBuffer(msg.getContent(), "",
                               headers,
                               "qmf.default.topic", key.str());
 }
@@ -264,7 +264,7 @@ uint32_t ManagementAgentImpl::pollCallba
         methodQueue.pop_front();
         {
             Mutex::ScopedUnlock unlock(agentLock);
-            invokeMethodRequest(item->body, item->sequence, item->replyTo);
+            invokeMethodRequest(item->body, item->cid, item->replyTo);
             delete item;
         }
     }
@@ -353,8 +353,10 @@ void ManagementAgentImpl::sendHeartbeat(
     headers["qmf.agent"] = name_address;
 
     map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
     content.encode();
-    connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key);
+    connThreadBody.sendBuffer(msg.getContent(), "", headers, addr_exchange, addr_key);
 
     QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
 }
@@ -470,7 +472,7 @@ void ManagementAgentImpl::handleConsoleA
     QPID_LOG(trace, "RCVD ConsoleAddedInd");
 }
 
-void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo)
 {
     string   methodName;
     qpid::messaging::Message inMsg(body);
@@ -521,10 +523,10 @@ void ManagementAgentImpl::invokeMethodRe
     headers["qmf.agent"] = name_address;
 
     outMap.encode();
-    connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+    connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
 }
 
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, const string& cid, const string& replyTo)
 {
     FieldTable           ft;
     FieldTable::ValuePtr value;
@@ -565,11 +567,11 @@ void ManagementAgentImpl::handleGetQuery
             headers["qmf.agent"] = name_address;
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
-        sendCommandComplete(replyTo, sequence);
+        //sendCommandComplete(replyTo, sequence);
         return;
     }
 
@@ -600,21 +602,44 @@ void ManagementAgentImpl::handleGetQuery
             headers["qmf.agent"] = name_address;
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
     }
 
-    sendCommandComplete(replyTo, sequence);
+    //sendCommandComplete(replyTo, sequence);
 }
 
-void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+{
+    QPID_LOG(trace, "RCVD AgentLocateRequest");
+    static const string addr_exchange("qmf.default.direct");
+
+    messaging::Message msg;
+    messaging::MapContent content(msg);
+    messaging::Variant::Map& map(content.asMap());
+    messaging::Variant::Map headers;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_locate_response";
+    headers["qmf.agent"] = name_address;
+
+    map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
+    content.encode();
+    connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyTo);
+
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
 {
     if (extThread) {
         Mutex::ScopedLock lock(agentLock);
 
-        methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+        methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
         if (pipeHandle != 0) {
             pipeHandle->write("X", 1);
         } else if (notifyable != 0) {
@@ -633,7 +658,7 @@ void ManagementAgentImpl::handleMethodRe
             inCallback = false;
         }
     } else {
-        invokeMethodRequest(body, sequence, replyTo);
+        invokeMethodRequest(body, cid, replyTo);
     }
 
     QPID_LOG(trace, "RCVD MethodRequest");
@@ -642,32 +667,22 @@ void ManagementAgentImpl::handleMethodRe
 void ManagementAgentImpl::received(Message& msg)
 {
     string   replyToKey;
-    framing::MessageProperties p = msg.getMessageProperties();
-    if (p.hasReplyTo()) {
-        const framing::ReplyTo& rt = p.getReplyTo();
+    framing::MessageProperties mp = msg.getMessageProperties();
+    if (mp.hasReplyTo()) {
+        const framing::ReplyTo& rt = mp.getReplyTo();
         replyToKey = rt.getRoutingKey();
     }
 
-    if (msg.getHeaders().getAsString("app_id") == "qmf2")
+    if (mp.hasAppId() && mp.getAppId() == "qmf2")
     {
-        uint32_t sequence = 0;
-        std::string opcode = msg.getHeaders().getAsString("qmf.opcode");
-        std::string cid = msg.getMessageProperties().getCorrelationId();
-        if (!cid.empty()) {
-            try {
-                sequence = boost::lexical_cast<uint32_t>(cid);
-            } catch(const boost::bad_lexical_cast&) {
-                QPID_LOG(warning, "Bad correlation Id for received QMF request.");
-                return;
-            }
-        }
+        string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+        string cid = msg.getMessageProperties().getCorrelationId();
 
-        if (opcode == "_method_request") {
-            handleMethodRequest(msg.getData(), sequence, replyToKey);
-            return;
+        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
+        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToKey);
+        else {
+            QPID_LOG(trace, "Support for QMF Opcode [" << opcode << "] TBD!!!");
         }
-
-        QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
         return;
     }
 
@@ -684,7 +699,6 @@ void ManagementAgentImpl::received(Messa
         if      (opcode == 'a') handleAttachResponse(inBuffer);
         else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
         else if (opcode == 'x') handleConsoleAddedIndication();
-        else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
         else if (opcode == 'M')
             QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
     }
@@ -886,17 +900,14 @@ void ManagementAgentImpl::periodicProces
         content.encode();
         const std::string &str = m.getContent();
         if (str.length()) {
-            stringstream key;
             ::qpid::messaging::Variant::Map  headers;
-            key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
-                baseObject->getPackageName() << "." << baseObject->getClassName();
             headers["method"] = "indication";
             headers["qmf.opcode"] = "_data_indication";
             headers["qmf.content"] = "_data";
             headers["qmf.agent"] = name_address;
 
-            connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list");
-            QPID_LOG(trace, "SENT DataIndication key=" << key.str());
+            connThreadBody.sendBuffer(str, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+            QPID_LOG(trace, "SENT DataIndication");
         }
     }
 
@@ -936,6 +947,10 @@ void ManagementAgentImpl::ConnectionThre
                                      arg::exclusive=true);
                 session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
                                      arg::bindingKey=queueName.str());
+                session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+                                     arg::bindingKey=agent.name_address);
+                session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+                                     arg::bindingKey="console.#");
 
                 subscriptions->subscribe(agent, queueName.str(), dest);
                 QPID_LOG(info, "Connection established with broker");
@@ -1009,7 +1024,7 @@ void ManagementAgentImpl::ConnectionThre
 
 
 void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
-                                                       uint32_t sequence,
+                                                       const string& cid,
                                                        const qpid::messaging::VariantMap headers,
                                                        const string& exchange,
                                                        const string& routingKey,
@@ -1018,11 +1033,9 @@ void ManagementAgentImpl::ConnectionThre
     Message msg;
     qpid::messaging::VariantMap::const_iterator i;
 
-    if (sequence) {
-        std::stringstream seqstr;
-        seqstr << sequence;
-        msg.getMessageProperties().setCorrelationId(seqstr.str());
-    }
+    if (!cid.empty())
+        msg.getMessageProperties().setCorrelationId(cid);
+
     if (!contentType.empty())
         msg.getMessageProperties().setContentType(contentType);
     for (i = headers.begin(); i != headers.end(); ++i) {

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=925260&r1=925259&r2=925260&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Fri Mar 19 15:14:02 2010
@@ -123,10 +123,10 @@ class ManagementAgentImpl : public Manag
     };
 
     struct QueuedMethod {
-        QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
-            sequence(_seq), replyTo(_reply), body(_body) {}
+    QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) :
+        cid(_cid), replyTo(_reply), body(_body) {}
 
-        uint32_t    sequence;
+        std::string cid;
         std::string replyTo;
         std::string body;
     };
@@ -205,7 +205,7 @@ class ManagementAgentImpl : public Manag
                         const std::string&     exchange,
                         const std::string&     routingKey);
         void sendBuffer(const std::string&     data,
-                        const uint32_t sequence,
+                        const std::string&     cid,
                         const qpid::messaging::VariantMap headers,
                         const std::string&     exchange,
                         const std::string&     routingKey,
@@ -263,9 +263,11 @@ class ManagementAgentImpl : public Manag
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
     void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
-    void invokeMethodRequest  (const std::string& body, uint32_t sequence, std::string replyTo);
-    void handleGetQuery       (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
-    void handleMethodRequest  (const std::string& body, uint32_t sequence, std::string replyTo);
+    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo);
+
+    void handleGetQuery       (qpid::framing::Buffer& inBuffer, const std::string& cid, const std::string& replyTo);
+    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
+    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
     void handleConsoleAddedIndication();
 };
 

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=925260&r1=925259&r2=925260&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Fri Mar 19 15:14:02 2010
@@ -352,7 +352,6 @@ class Object(object):
     raise Exception("Invalid Method (software defect) [%s]" % name)
 
   def _encodeUnmanaged(self, codec):
-
     codec.write_uint8(20) 
     codec.write_str8(self._schema.getKey().getPackageName())
     codec.write_str8(self._schema.getKey().getClassName())
@@ -482,7 +481,7 @@ class Session:
 
     self.brokers.append(broker)
     if not self.manageConnections:
-      self.getObjects(broker=broker, _class="agent")
+      self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
     return broker
 
   def delBroker(self, broker):
@@ -873,6 +872,7 @@ class Session:
     timestamp = codec.read_uint64()
     if self.console != None and agent != None:
       self.console.heartbeat(agent, timestamp)
+    broker._ageAgents()
 
   def _handleEventInd(self, broker, codec, seq):
     if self.console != None:
@@ -927,6 +927,39 @@ class Session:
       if stat:
         self.console.objectStats(broker, object)
 
+  def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
+    brokerBank = 1
+    agentName = ah["qmf.agent"]
+    values = content["_values"]
+    timestamp = values["timestamp"]
+    interval = values["heartbeat_interval"]
+    if agentName == None:
+      return
+    agent = broker.getAgent(brokerBank, agentName)
+    if agent == None:
+      agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
+      broker._addAgent(agentName, agent)
+    else:
+      agent.touch()
+    if self.console and agent:
+      self.console.heartbeat(agent, timestamp)
+    broker._ageAgents()
+
+  def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
+    self._v2HandleHeartbeatInd(broker, mp, ah, content)
+
+  def _v2HandleDataInd(self, broker, mp, ah, content):
+    pass
+
+  def _v2HandleQueryRsp(self, broker, mp, ah, content):
+    pass
+
+  def _v2HandleMethodRsp(self, broker, mp, ah, content):
+    pass
+
+  def _v2HandleException(self, broker, mp, ah, content):
+    pass
+
   def _handleError(self, error):
     try:
       self.cv.acquire()
@@ -1557,6 +1590,7 @@ class Broker:
     self.authUser = authUser
     self.authPass = authPass
     self.cv = Condition()
+    self.agentLock = Lock()
     self.error = None
     self.brokerId = None
     self.connected = False
@@ -1590,8 +1624,12 @@ class Broker:
   def getAgent(self, brokerBank, agentBank):
     """ Return the agent object associated with a particular broker and agent bank value."""
     bankKey = (brokerBank, agentBank)
-    if bankKey in self.agents:
-      return self.agents[bankKey]
+    try:
+      self.agentLock.acquire()
+      if bankKey in self.agents:
+        return self.agents[bankKey]
+    finally:
+      self.agentLock.release()
     return None
 
   def getSessionId(self):
@@ -1600,7 +1638,11 @@ class Broker:
 
   def getAgents(self):
     """ Get the list of agents reachable via this broker """
-    return self.agents.values()
+    try:
+      self.agentLock.acquire()
+      return self.agents.values()
+    finally:
+      self.agentLock.release()
 
   def getAmqpSession(self):
     """ Get the AMQP session object for this connected broker. """
@@ -1629,8 +1671,13 @@ class Broker:
 
   def _tryToConnect(self):
     try:
-      self.agents = {}
-      self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+      try:
+        self.agentLock.acquire()
+        self.agents = {}
+        self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+      finally:
+        self.agentLock.release()
+
       self.topicBound = False
       self.syncInFlight = False
       self.syncRequest = 0
@@ -1679,6 +1726,24 @@ class Broker:
       self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
       self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
 
+      ##
+      ## Set up connectivity for QMFv2
+      ##
+      self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
+      self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+      self.amqpSession.exchange_bind(exchange="qmf.default.direct",
+                                     queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+      self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+                                     queue=self.v2_queue_name, binding_key="agent.#")
+      ## Other bindings here...
+      self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+                                         accept_mode=self.amqpSession.accept_mode.none,
+                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+      self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
+      self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
+      self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
+      self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+
       self.connected = True
       self.session._handleBrokerConnect(self)
 
@@ -1686,6 +1751,7 @@ class Broker:
       self._setHeader(codec, 'B')
       msg = self._message(codec.encoded)
       self._send(msg)
+      self._v2SendAgentLocate()
 
     except socket.error, e:
       self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
@@ -1699,17 +1765,65 @@ class Broker:
 
   def _updateAgent(self, obj):
     bankKey = (obj.brokerBank, obj.agentBank)
+    agent = None
     if obj._deleteTime == 0:
-      if bankKey not in self.agents:
-        agent = Agent(self, obj.agentBank, obj.label)
-        self.agents[bankKey] = agent
-        if self.session.console != None:
-          self.session.console.newAgent(agent)
+      try:
+        self.agentLock.acquire()
+        if bankKey not in self.agents:
+          agent = Agent(self, obj.agentBank, obj.label)
+          self.agents[bankKey] = agent
+      finally:
+        self.agentLock.release()
+      if agent and self.session.console:
+        self.session.console.newAgent(agent)
     else:
-      agent = self.agents.pop(bankKey, None)
-      if agent != None and self.session.console != None:
+      try:
+        self.agentLock.acquire()
+        agent = self.agents.pop(bankKey, None)
+      finally:
+        self.agentLock.release()
+      if agent and self.session.console:
         self.session.console.delAgent(agent)
 
+  def _addAgent(self, name, agent):
+    try:
+      self.agentLock.acquire()
+      self.agents[(1, name)] = agent
+    finally:
+      self.agentLock.release()
+    if self.session.console:
+      self.session.console.newAgent(agent)
+
+  def _ageAgents(self):
+    try:
+      self.agentLock.acquire()
+      to_delete = []
+      to_notify = []
+      for key in self.agents:
+        if self.agents[key].isOld():
+          to_delete.append(key)
+      for key in to_delete:
+        to_notify.append(self.agents.pop(key, None))
+    finally:
+      self.agentLock.release()
+    if self.session.console:
+      for agent in to_notify:
+        self.session.console.delAgent(agent)
+
+  def _v2SendAgentLocate(self, predicate={}):
+    dp = self.amqpSession.delivery_properties()
+    dp.routing_key = "console.request.agent_locate"
+    mp = self.amqpSession.message_properties()
+    mp.content_type = "amqp/map"
+    mp.user_id = self.authUser
+    mp.app_id = "qmf2"
+    mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+    mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
+    sendCodec = Codec()
+    sendCodec.write_map(predicate)
+    msg = Message(dp, mp, sendCodec.encoded)
+    self._send(msg, "qmf.default.topic")
+
   def _setHeader(self, codec, opcode, seq=0):
     """ Compose the header of a management message. """
     codec.write_uint8(ord('A'))
@@ -1819,6 +1933,28 @@ class Broker:
     self.session.receiver._completed.add(msg.id)
     self.session.channel.session_completed(self.session.receiver._completed)
 
+  def _v2Cb(self, msg):
+    dp = msg.get("delivery_properties")
+    mp = msg.get("message_properties")
+    ah = mp["application_headers"]
+    opcode = ah["qmf.opcode"]
+    codec = Codec(msg.body)
+
+    if mp.content_type == "amqp/list":
+      content = codec.read_list()
+    elif mp.content_type == "amqp/map":
+      content = codec.read_map()
+    else:
+      return
+
+    if   opcode == None: return
+    elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+    elif opcode == '_agent_locate_response':      self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+    elif opcode == '_data_indication':            self.session._v2HandleDataInd(self, mp, ah, content)
+    elif opcode == '_query_response':             self.session._v2HandleQueryRsp(self, mp, ah, content)
+    elif opcode == '_method_response':            self.session._v2HandleMethodRsp(self, mp, ah, content)
+    elif opcode == '_exception':                  self.session._v2HandleException(self, mp, ah, content)
+
   def _exceptionCb(self, data):
     self.connected = False
     self.error = data
@@ -1835,14 +1971,31 @@ class Broker:
 
 class Agent:
   """ """
-  def __init__(self, broker, agentBank, label):
+  def __init__(self, broker, agentBank, label, isV2=False, interval=0):
     self.broker = broker
     self.brokerBank = broker.getBrokerBank()
     self.agentBank = agentBank
     self.label = label
+    self.isV2 = isV2
+    self.heartbeatInterval = interval
+    self.lastSeenTime = time()
+
+  def touch(self):
+    self.lastSeenTime = time()
+
+  def isOld(self):
+    if self.heartbeatInterval == 0:
+      return None
+    if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+      return True
+    return None
 
   def __repr__(self):
-    return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
+    if self.isV2:
+      ver = "v2"
+    else:
+      ver = "v1"
+    return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
 
   def getBroker(self):
     return self.broker



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org