You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/08/01 20:36:25 UTC
svn commit: r681773 - in /incubator/qpid/trunk/qpid:
cpp/src/qpid/agent/ManagementAgentImpl.cpp
cpp/src/qpid/management/ManagementBroker.cpp
cpp/src/qpid/management/ManagementBroker.h specs/management-schema.xml
Author: tross
Date: Fri Aug 1 11:36:25 2008
New Revision: 681773
URL: http://svn.apache.org/viewvc?rev=681773&view=rev
Log:
QPID-1174 - Clean up agent objects when the remote agent disconnects
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Aug 1 11:36:25 2008
@@ -112,7 +112,6 @@
EncodeHeader (buffer, 'A');
buffer.putShortString ("RemoteAgent [C++]");
- buffer.putShortString (queueName.str());
systemId.encode (buffer);
buffer.putLong (11);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri Aug 1 11:36:25 2008
@@ -26,6 +26,7 @@
#include <qpid/broker/MessageDelivery.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
+#include "qpid/broker/ConnectionState.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -375,7 +376,10 @@
iter++)
managementObjects.erase (*iter);
- deleteList.clear ();
+ if (!deleteList.empty()) {
+ deleteList.clear();
+ deleteOrphanedAgentsLH();
+ }
}
void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
@@ -664,44 +668,72 @@
return requestedBank;
}
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::deleteOrphanedAgentsLH()
+{
+ vector<uint64_t> deleteList;
+
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
+ uint64_t connectionRef = aIter->first;
+ bool found = false;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++) {
+ if (iter->first == connectionRef && !iter->second->isDeleted()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ deleteList.push_back(connectionRef);
+ delete aIter->second;
+ }
+ }
+
+ for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
+
+ remoteAgents.erase(*dIter);
+ }
+
+ deleteList.clear();
+}
+
+void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- string sessionName;
+ uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
- inBuffer.getShortString (label);
- inBuffer.getShortString (sessionName);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
-
- // TODO: Make a pass over the agents and delete any that no longer have a session.
-
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
- if (aIter != remoteAgents.end())
- {
+ moveNewObjectsLH();
+ deleteOrphanedAgentsLH();
+ RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
+ if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent");
+ sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
- // TODO: Reject requests for which the session name does not match an existing session.
+ inBuffer.getShortString (label);
+ systemId.decode (inBuffer);
+ requestedBank = inBuffer.getLong ();
+ assignedBank = assignBankLH (requestedBank);
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
agent->routingKey = replyToKey;
+ agent->connectionRef = connectionRef;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionName (sessionName);
+ agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
agent->mgmtObject->set_objectIdBank (assignedBank);
addObject (agent->mgmtObject);
- remoteAgents[sessionName] = agent;
+ remoteAgents[connectionRef] = agent;
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -788,7 +820,7 @@
else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Fri Aug 1 11:36:25 2008
@@ -26,6 +26,7 @@
#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/broker/ConnectionToken.h"
#include "qpid/agent/ManagementAgent.h"
#include "ManagementObject.h"
#include "Manageable.h"
@@ -87,6 +88,7 @@
{
uint32_t objIdBank;
std::string routingKey;
+ uint64_t connectionRef;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -95,8 +97,8 @@
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
+ typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
@@ -192,6 +194,7 @@
bool bankInUse (uint32_t bank);
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
+ void deleteOrphanedAgentsLH();
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
@@ -201,7 +204,7 @@
void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=681773&r1=681772&r2=681773&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Aug 1 11:36:25 2008
@@ -102,11 +102,11 @@
===============================================================
-->
<class name="Agent">
- <property name="sessionName" type="sstr" access="RO" index="y" desc="Session ID for Agent"/>
- <property name="label" type="sstr" access="RO" desc="Label for agent"/>
- <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
- <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
- <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/>
+ <property name="connectionRef" type="objId" references="Connection" access="RO" index="y"/>
+ <property name="label" type="sstr" access="RO" desc="Label for agent"/>
+ <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
+ <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
+ <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/>
</class>
<!--