You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/08/01 20:36:25 UTC

svn commit: r681773 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/agent/ManagementAgentImpl.cpp cpp/src/qpid/management/ManagementBroker.cpp cpp/src/qpid/management/ManagementBroker.h specs/management-schema.xml

Author: tross
Date: Fri Aug  1 11:36:25 2008
New Revision: 681773

URL: http://svn.apache.org/viewvc?rev=681773&view=rev
Log:
QPID-1174 - Clean up agent objects when the remote agent disconnects

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Aug  1 11:36:25 2008
@@ -112,7 +112,6 @@
 
     EncodeHeader (buffer, 'A');
     buffer.putShortString ("RemoteAgent [C++]");
-    buffer.putShortString (queueName.str());
     systemId.encode  (buffer);
     buffer.putLong (11);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri Aug  1 11:36:25 2008
@@ -26,6 +26,7 @@
 #include <qpid/broker/MessageDelivery.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Time.h"
+#include "qpid/broker/ConnectionState.h"
 #include <list>
 #include <iostream>
 #include <fstream>
@@ -375,7 +376,10 @@
          iter++)
         managementObjects.erase (*iter);
 
-    deleteList.clear ();
+    if (!deleteList.empty()) {
+        deleteList.clear();
+        deleteOrphanedAgentsLH();
+    }
 }
 
 void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
@@ -664,44 +668,72 @@
     return requestedBank;
 }
 
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::deleteOrphanedAgentsLH()
+{
+    vector<uint64_t> deleteList;
+
+    for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
+        uint64_t connectionRef = aIter->first;
+        bool found = false;
+
+        for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+             iter != managementObjects.end ();
+             iter++) {
+            if (iter->first == connectionRef && !iter->second->isDeleted()) {
+                found = true;
+                break;
+            }
+        }
+
+        if (!found) {
+            deleteList.push_back(connectionRef);
+            delete aIter->second;
+        }
+    }
+
+    for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
+        
+        remoteAgents.erase(*dIter);
+    }
+
+    deleteList.clear();
+}
+
+void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
 {
     string   label;
     uint32_t requestedBank;
     uint32_t assignedBank;
-    string   sessionName;
+    uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
     Uuid     systemId;
 
-    inBuffer.getShortString (label);
-    inBuffer.getShortString (sessionName);
-    systemId.decode  (inBuffer);
-    requestedBank = inBuffer.getLong ();
-    assignedBank  = assignBankLH (requestedBank);
-
-    // TODO: Make a pass over the agents and delete any that no longer have a session.
-
-    RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
-    if (aIter != remoteAgents.end())
-    {
+    moveNewObjectsLH();
+    deleteOrphanedAgentsLH();
+    RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
+    if (aIter != remoteAgents.end()) {
         // There already exists an agent on this session.  Reject the request.
-        sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent");
+        sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
         return;
     }
 
-    // TODO: Reject requests for which the session name does not match an existing session.
+    inBuffer.getShortString (label);
+    systemId.decode  (inBuffer);
+    requestedBank = inBuffer.getLong ();
+    assignedBank  = assignBankLH (requestedBank);
 
     RemoteAgent* agent = new RemoteAgent;
     agent->objIdBank  = assignedBank;
     agent->routingKey = replyToKey;
+    agent->connectionRef = connectionRef;
     agent->mgmtObject = new management::Agent (this, agent);
-    agent->mgmtObject->set_sessionName  (sessionName);
+    agent->mgmtObject->set_connectionRef(agent->connectionRef);
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
     agent->mgmtObject->set_systemId     (systemId);
     agent->mgmtObject->set_objectIdBank (assignedBank);
     addObject (agent->mgmtObject);
 
-    remoteAgents[sessionName] = agent;
+    remoteAgents[connectionRef] = agent;
 
     // Send an Attach Response
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -788,7 +820,7 @@
     else if (opcode == 'q') handleClassIndLH       (inBuffer, replyToKey, sequence);
     else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, replyToKey, sequence);
     else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
-    else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence);
+    else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
     else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
     else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Fri Aug  1 11:36:25 2008
@@ -26,6 +26,7 @@
 #include "qpid/broker/Timer.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/broker/ConnectionToken.h"
 #include "qpid/agent/ManagementAgent.h"
 #include "ManagementObject.h"
 #include "Manageable.h"
@@ -87,6 +88,7 @@
     {
         uint32_t          objIdBank;
         std::string       routingKey;
+        uint64_t          connectionRef;
         Agent*            mgmtObject;
         ManagementObject* GetManagementObject (void) const { return mgmtObject; }
         virtual ~RemoteAgent ();
@@ -95,8 +97,8 @@
     // TODO: Eventually replace string with entire reply-to structure.  reply-to
     //       currently assumes that the exchange is "amq.direct" even though it could
     //       in theory be specified differently.
-    typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
-    typedef std::vector<std::string>            ReplyToVector;
+    typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+    typedef std::vector<std::string>         ReplyToVector;
 
     //  Storage for known schema classes:
     //
@@ -192,6 +194,7 @@
     bool     bankInUse (uint32_t bank);
     uint32_t allocateNewBank ();
     uint32_t assignBankLH (uint32_t requestedPrefix);
+    void deleteOrphanedAgentsLH();
     void sendCommandComplete (std::string replyToKey, uint32_t sequence,
                               uint32_t code = 0, std::string text = std::string("OK"));
     void handleBrokerRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
@@ -201,7 +204,7 @@
     void handleClassIndLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleSchemaRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
 

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Aug  1 11:36:25 2008
@@ -102,11 +102,11 @@
   ===============================================================
   -->
   <class name="Agent">
-    <property name="sessionName"  type="sstr"   access="RO" index="y" desc="Session ID for Agent"/>
-    <property name="label"        type="sstr"   access="RO"           desc="Label for agent"/>
-    <property name="registeredTo" type="objId"  references="Broker" access="RO" desc="Broker agent is registered to"/>
-    <property name="systemId"     type="uuid"   access="RO"           desc="Identifier of system where agent resides"/>
-    <property name="objectIdBank" type="uint32" access="RO"           desc="Assigned object-id bank"/>
+    <property name="connectionRef" type="objId"  references="Connection" access="RO" index="y"/>
+    <property name="label"         type="sstr"   access="RO"           desc="Label for agent"/>
+    <property name="registeredTo"  type="objId"  references="Broker" access="RO" desc="Broker agent is registered to"/>
+    <property name="systemId"      type="uuid"   access="RO"           desc="Identifier of system where agent resides"/>
+    <property name="objectIdBank"  type="uint32" access="RO"           desc="Assigned object-id bank"/>
   </class>
 
   <!--