You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/04/15 21:37:20 UTC

svn commit: r934561 - in /qpid/trunk/qpid/cpp/src/qpid/management: ManagementAgent.cpp ManagementAgent.h

Author: kgiusti
Date: Thu Apr 15 19:37:20 2010
New Revision: 934561

URL: http://svn.apache.org/viewvc?rev=934561&view=rev
Log:
QPID-2507: drop the userLock before calling exchange->route()

Modified:
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=934561&r1=934560&r2=934561&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Apr 15 19:37:20 2010
@@ -342,7 +342,7 @@ void ManagementAgent::raiseEvent(const M
         outBuffer.putRawData(sBuf);
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
-        sendBuffer(outBuffer, outLen, mExchange,
+        sendBufferLH(outBuffer, outLen, mExchange,
                    "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
         QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
     }
@@ -372,7 +372,7 @@ void ManagementAgent::raiseEvent(const M
 
         string content;
         MapCodec::encode(map_, content);
-        sendBuffer(content, "", headers, v2Topic, key.str());
+        sendBufferLH(content, "", headers, "amqp/map", v2Topic, key.str());
         QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
     }
 
@@ -393,6 +393,8 @@ void ManagementAgent::Periodic::fire ()
 
 void ManagementAgent::clientAdded (const std::string& routingKey)
 {
+    sys::Mutex::ScopedLock lock(userLock);
+
     if (routingKey.find("console") != 0)
         return;
 
@@ -407,7 +409,7 @@ void ManagementAgent::clientAdded (const
         encodeHeader(outBuffer, 'x');
         outLen = outBuffer.getPosition();
         outBuffer.reset();
-        sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey);
+        sendBufferLH(outBuffer, outLen, dExchange, aIter->second->routingKey);
         QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
     }
 }
@@ -441,10 +443,11 @@ bool ManagementAgent::checkHeader (Buffe
     return h1 == 'A' && h2 == 'M' && h3 == '2';
 }
 
-void ManagementAgent::sendBuffer(Buffer&  buf,
-                                 uint32_t length,
-                                 qpid::broker::Exchange::shared_ptr exchange,
-                                 string   routingKey)
+// NOTE WELL: assumes userLock is held by caller (LH)
+void ManagementAgent::sendBufferLH(Buffer&  buf,
+                                   uint32_t length,
+                                   qpid::broker::Exchange::shared_ptr exchange,
+                                   string   routingKey)
 {
     if (suppressed) {
         QPID_LOG(trace, "Suppressed management message to " << routingKey);
@@ -477,18 +480,24 @@ void ManagementAgent::sendBuffer(Buffer&
 
     msg->getFrames().append(content);
 
-    DeliverableMessage deliverable (msg);
-    try {
-        exchange->route(deliverable, routingKey, 0);
-    } catch(exception&) {}
+    {
+        sys::Mutex::ScopedUnlock u(userLock);
+
+        DeliverableMessage deliverable (msg);
+        try {
+            exchange->route(deliverable, routingKey, 0);
+        } catch(exception&) {}
+    }
 }
 
 
-void ManagementAgent::sendBuffer(const std::string& data,
-                                 const std::string& cid,
-                                 const Variant::Map& headers,
-                                 qpid::broker::Exchange::shared_ptr exchange,
-                                 const std::string& routingKey)
+// NOTE WELL: assumes userLock is held by caller (LH)
+void ManagementAgent::sendBufferLH(const std::string& data,
+                                   const std::string& cid,
+                                   const Variant::Map& headers,
+                                   const std::string& content_type,
+                                   qpid::broker::Exchange::shared_ptr exchange,
+                                   const std::string& routingKey)
 {
     Variant::Map::const_iterator i;
 
@@ -517,6 +526,7 @@ void ManagementAgent::sendBuffer(const s
     if (!cid.empty()) {
         props->setCorrelationId(cid);
     }
+    props->setContentType(content_type);
 
     for (i = headers.begin(); i != headers.end(); ++i) {
         msg->getOrInsertHeaders().setString(i->first, i->second.asString());
@@ -529,10 +539,14 @@ void ManagementAgent::sendBuffer(const s
 
     msg->getFrames().append(content);
 
-    DeliverableMessage deliverable (msg);
-    try {
-        exchange->route(deliverable, routingKey, 0);
-    } catch(exception&) {}
+    {
+        sys::Mutex::ScopedUnlock u(userLock);
+
+        DeliverableMessage deliverable (msg);
+        try {
+            exchange->route(deliverable, routingKey, 0);
+        } catch(exception&) {}
+    }
 }
 
 
@@ -683,7 +697,7 @@ void ManagementAgent::periodicProcessing
                     msgBuffer.reset();
                     stringstream key;
                     key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
-                    sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+                    sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
                     QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
                 }
             }
@@ -701,7 +715,7 @@ void ManagementAgent::periodicProcessing
                     headers["qmf.content"] = "_data";
                     headers["qmf.agent"] = name_address;
 
-                    sendBuffer(content, "", headers, v2Topic, key.str());
+                    sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
                     QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
                 }
             }
@@ -732,7 +746,7 @@ void ManagementAgent::periodicProcessing
                 msgBuffer.reset ();
                 stringstream key;
                 key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
-                sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+                sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
                 QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
             }
 
@@ -760,7 +774,7 @@ void ManagementAgent::periodicProcessing
 
                 string content;
                 ListCodec::encode(list_, content);
-                sendBuffer(content, "", headers, v2Topic, key.str());
+                sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
                 QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
             }
         }
@@ -784,7 +798,7 @@ void ManagementAgent::periodicProcessing
         contentSize = BUFSIZE - msgBuffer.available ();
         msgBuffer.reset ();
         routingKey = "console.heartbeat.1.0";
-        sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+        sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
         QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
 
@@ -804,7 +818,7 @@ void ManagementAgent::periodicProcessing
 
         string content;
         MapCodec::encode(map, content);
-        sendBuffer(content, "", headers, v2Topic, addr_key);
+        sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key);
 
         QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
     }
@@ -834,7 +848,7 @@ void ManagementAgent::deleteObjectNowLH(
         msgBuffer.reset();
         stringstream key;
         key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
-        sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+        sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
         QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
     }
 
@@ -862,15 +876,15 @@ void ManagementAgent::deleteObjectNowLH(
 
         string content;
         ListCodec::encode(list_, content);
-        sendBuffer(content, "", headers, v2Topic, key.str());
+        sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
         QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
     }
 
     managementObjects.erase(oid);
 }
 
-void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
-                                           uint32_t code, string text)
+void ManagementAgent::sendCommandCompleteLH(string replyToKey, uint32_t sequence,
+                                            uint32_t code, string text)
 {
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
@@ -880,7 +894,7 @@ void ManagementAgent::sendCommandComplet
     outBuffer.putShortString (text);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
     QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
              replyToKey << " seq=" << sequence);
 }
@@ -987,7 +1001,7 @@ void ManagementAgent::handleMethodReques
         outBuffer.putMediumString(i->second);
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
-        sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+        sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
         QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
         return;
     }
@@ -1003,7 +1017,7 @@ void ManagementAgent::handleMethodReques
             outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+            sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
             QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
             return;
         }
@@ -1035,7 +1049,7 @@ void ManagementAgent::handleMethodReques
 
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
-    sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+    sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
     QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
 }
 
@@ -1064,7 +1078,7 @@ void ManagementAgent::handleMethodReques
         (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
 
         MapCodec::encode(outMap, content);
-        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
         QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
         return;
     }
@@ -1087,7 +1101,7 @@ void ManagementAgent::handleMethodReques
         (outMap["_values"].asMap())["_status_text"] = e.what();
 
         MapCodec::encode(outMap, content);
-        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
         QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
         return;
     }
@@ -1100,7 +1114,7 @@ void ManagementAgent::handleMethodReques
         (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
 
         MapCodec::encode(outMap, content);
-        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
         QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
         return;
     }
@@ -1116,7 +1130,7 @@ void ManagementAgent::handleMethodReques
         (outMap["_values"].asMap())["_status_text"] = i->second;
 
         MapCodec::encode(outMap, content);
-        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
         QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
         return;
     }
@@ -1133,7 +1147,7 @@ void ManagementAgent::handleMethodReques
             (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
 
             MapCodec::encode(outMap, content);
-            sendBuffer(content, cid, headers, v2Direct, replyTo);
+            sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
             QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
             return;
         }
@@ -1146,6 +1160,7 @@ void ManagementAgent::handleMethodReques
              methodName << " replyTo=" << replyTo);
 
     try {
+        sys::Mutex::ScopedUnlock u(userLock);
         iter->second->doMethod(methodName, inArgs, outMap);
     } catch(exception& e) {
         outMap.clear();
@@ -1154,13 +1169,13 @@ void ManagementAgent::handleMethodReques
         (outMap["_values"].asMap())["_status_text"] = e.what();
 
         MapCodec::encode(outMap, content);
-        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
         QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
         return;
     }
 
     MapCodec::encode(outMap, content);
-    sendBuffer(content, cid, headers, v2Direct, replyTo);
+    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
     QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
 }
 
@@ -1177,7 +1192,7 @@ void ManagementAgent::handleBrokerReques
 
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
     QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
 }
 
@@ -1196,11 +1211,11 @@ void ManagementAgent::handlePackageQuery
         encodePackageIndication (outBuffer, pIter);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
-        sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+        sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
         QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
     }
 
-    sendCommandComplete (replyToKey, sequence);
+    sendCommandCompleteLH(replyToKey, sequence);
 }
 
 void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -1239,13 +1254,13 @@ void ManagementAgent::handleClassQueryLH
                 encodeClassIndication(outBuffer, pIter, cIter);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
                          "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence);
             }
         }
     }
-    sendCommandComplete(replyToKey, sequence);
+    sendCommandCompleteLH(replyToKey, sequence);
 }
 
 void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -1274,7 +1289,7 @@ void ManagementAgent::handleClassIndLH (
         key.encode(outBuffer);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
-        sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+        sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
         QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
                  "), to=" << replyToKey << " seq=" << sequence);
 
@@ -1324,17 +1339,17 @@ void ManagementAgent::handleSchemaReques
                 classInfo.appendSchema(outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
             }
             else
-                sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
+                sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
         }
         else
-            sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
+            sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
     }
     else
-        sendCommandComplete(replyToKey, sequence, 1, "Package not found");
+        sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
 }
 
 void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
@@ -1371,7 +1386,7 @@ void ManagementAgent::handleSchemaRespon
                 encodeClassIndication(outBuffer, pIter, cIter);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBuffer(outBuffer, outLen, mExchange, "schema.class");
+                sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
                 QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
                          " to=schema.class");
             }
@@ -1448,7 +1463,7 @@ void ManagementAgent::handleAttachReques
     RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
     if (aIter != remoteAgents.end()) {
         // There already exists an agent on this session.  Reject the request.
-        sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
+        sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent");
         return;
     }
 
@@ -1488,7 +1503,7 @@ void ManagementAgent::handleAttachReques
     outBuffer.putLong (assignedBank);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
     QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
              " to=" << replyToKey << " seq=" << sequence);
 }
@@ -1530,11 +1545,11 @@ void ManagementAgent::handleGetQueryLH (
                 outBuffer.putRawData(sBuf);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
             }
         }
-        sendCommandComplete(replyToKey, sequence);
+        sendCommandCompleteLH(replyToKey, sequence);
         return;
     }
 
@@ -1561,13 +1576,13 @@ void ManagementAgent::handleGetQueryLH (
                 outBuffer.putRawData(sBuf);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
             }
         }
     }
 
-    sendCommandComplete(replyToKey, sequence);
+    sendCommandCompleteLH(replyToKey, sequence);
 }
 
 
@@ -1636,7 +1651,7 @@ void ManagementAgent::handleGetQueryLH(c
                     list_.push_back(map_);
 
                     ListCodec::encode(list_, content);
-                    sendBuffer(content, cid, headers, v2Direct, replyTo);
+                    sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
                 }
             }
         }
@@ -1659,7 +1674,7 @@ void ManagementAgent::handleGetQueryLH(c
                 list_.push_back(map_);
 
                 ListCodec::encode(list_, content);
-                sendBuffer(content, cid, headers, v2Direct, replyTo);
+                sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
                 }
             }
         }
@@ -1669,7 +1684,7 @@ void ManagementAgent::handleGetQueryLH(c
     list_.clear();
     headers.erase("partial");
     ListCodec::encode(list_, content);
-    sendBuffer(content, cid, headers, v2Direct, replyTo);
+    sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
     QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
 }
 
@@ -1692,7 +1707,7 @@ void ManagementAgent::handleLocateReques
 
     string content;
     MapCodec::encode(map, content);
-    sendBuffer(content, cid, headers, v2Direct, replyTo);
+    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
 
     QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
 }
@@ -1832,7 +1847,7 @@ bool ManagementAgent::authorizeAgentMess
 
                 string content;
                 MapCodec::encode(outMap, content);
-                sendBuffer(content, cid, headers, v2Direct, replyToKey);
+                sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
 
             } else {
 
@@ -1844,7 +1859,7 @@ bool ManagementAgent::authorizeAgentMess
                 outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
             }
 
             QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -1947,10 +1962,10 @@ ManagementAgent::PackageMap::iterator Ma
     encodePackageIndication (outBuffer, result.first);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    sendBuffer (outBuffer, outLen, mExchange, "schema.package");
-    QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
+    sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
+    QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package");
 
-        return result.first;
+    return result.first;
 }
 
 void ManagementAgent::addClassLH(uint8_t               kind,
@@ -2312,6 +2327,7 @@ void ManagementAgent::importAgents(qpid:
     Variant::List content;
     ListCodec::decode(buf, content);
     Variant::List::const_iterator l;
+    sys::Mutex::ScopedLock lock(userLock);
 
     for (l = content.begin(); l != content.end(); l++) {
         std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=934561&r1=934560&r2=934561&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Apr 15 19:37:20 2010
@@ -301,15 +301,16 @@ private:
     void deleteObjectNowLH(const ObjectId& oid);
     void encodeHeader       (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
     bool checkHeader        (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
-    void sendBuffer         (framing::Buffer&             buf,
-                             uint32_t                     length,
-                             qpid::broker::Exchange::shared_ptr exchange,
-                             std::string                  routingKey);
-    void sendBuffer(const std::string&     data,
-                    const std::string&     cid,
-                    const qpid::types::Variant::Map& headers,
-                    qpid::broker::Exchange::shared_ptr exchange,
-                    const std::string& routingKey);
+    void sendBufferLH(framing::Buffer&             buf,
+                      uint32_t                     length,
+                      qpid::broker::Exchange::shared_ptr exchange,
+                      std::string                  routingKey);
+    void sendBufferLH(const std::string&     data,
+                      const std::string&     cid,
+                      const qpid::types::Variant::Map& headers,
+                      const std::string&     content_type,
+                      qpid::broker::Exchange::shared_ptr exchange,
+                      const std::string& routingKey);
     void moveNewObjectsLH();
 
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -330,7 +331,7 @@ private:
     uint32_t allocateNewBank ();
     uint32_t assignBankLH (uint32_t requestedPrefix);
     void deleteOrphanedAgentsLH();
-    void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+    void sendCommandCompleteLH(std::string replyToKey, uint32_t sequence,
                               uint32_t code = 0, std::string text = std::string("OK"));
     void handleBrokerRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handlePackageQueryLH   (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org