You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/05/03 22:15:22 UTC

svn commit: r1099225 - /qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp

Author: tross
Date: Tue May  3 20:15:22 2011
New Revision: 1099225

URL: http://svn.apache.org/viewvc?rev=1099225&view=rev
Log:
QPID-3241 - Deadlock in qmf agent triggered by producer flow control

Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1099225&r1=1099224&r2=1099225&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue May  3 20:15:22 2011
@@ -305,43 +305,47 @@ void ManagementAgentImpl::raiseEvent(con
         "emerg", "alert", "crit", "error", "warn",
         "note", "info", "debug"
     };
-    sys::Mutex::ScopedLock lock(agentLock);
-    Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+    string content;
     stringstream key;
+    Variant::Map headers;
 
-    // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
-    // event.getPackageName() << "." << event.getEventName();
-    key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
-        << "." << keyifyNameStr(event.getEventName())
-        << "." << severityStr[sev]
-        << "." << vendorNameKey
-        << "." << productNameKey
-        << "." << instanceNameKey;
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
+        Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+        uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
 
-    Variant::Map map_;
-    Variant::Map schemaId;
-    Variant::Map values;
-    Variant::Map headers;
-    string content;
+        // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+        // event.getPackageName() << "." << event.getEventName();
+        key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+            << "." << keyifyNameStr(event.getEventName())
+            << "." << severityStr[sev]
+            << "." << vendorNameKey
+            << "." << productNameKey
+            << "." << instanceNameKey;
+
+        Variant::Map map_;
+        Variant::Map schemaId;
+        Variant::Map values;
+
+        map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+                                               event.getEventName(),
+                                               event.getMd5Sum(),
+                                               ManagementItem::CLASS_KIND_EVENT);
+        event.mapEncode(values);
+        map_["_values"] = values;
+        map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+        map_["_severity"] = sev;
 
-    map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
-                                           event.getEventName(),
-                                           event.getMd5Sum(),
-                                           ManagementItem::CLASS_KIND_EVENT);
-    event.mapEncode(values);
-    map_["_values"] = values;
-    map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
-    map_["_severity"] = sev;
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_event";
+        headers["qmf.agent"] = name_address;
 
-    headers["method"] = "indication";
-    headers["qmf.opcode"] = "_data_indication";
-    headers["qmf.content"] = "_event";
-    headers["qmf.agent"] = name_address;
+        Variant::List list;
+        list.push_back(map_);
+        ListCodec::encode(list, content);
+    }
 
-    Variant::List list;
-    list.push_back(map_);
-    ListCodec::encode(list, content);
     connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
 }
 
@@ -521,9 +525,12 @@ void ManagementAgentImpl::sendException(
 
 void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
 {
-    sys::Mutex::ScopedLock lock(agentLock);
     string packageName;
     SchemaClassKey key;
+    uint32_t outLen(0);
+    char localBuffer[MA_BUFFER_SIZE];
+    Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
+    bool found(false);
 
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(key.name);
@@ -531,26 +538,30 @@ void ManagementAgentImpl::handleSchemaRe
 
     QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
 
-    PackageMap::iterator pIter = packages.find(packageName);
-    if (pIter != packages.end()) {
-        ClassMap& cMap = pIter->second;
-        ClassMap::iterator cIter = cMap.find(key);
-        if (cIter != cMap.end()) {
-            SchemaClass& schema = cIter->second;
-            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
-            string   body;
-
-            encodeHeader(outBuffer, 's', sequence);
-            schema.writeSchemaCall(body);
-            outBuffer.putRawData(body);
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
-
-            QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
+        PackageMap::iterator pIter = packages.find(packageName);
+        if (pIter != packages.end()) {
+            ClassMap& cMap = pIter->second;
+            ClassMap::iterator cIter = cMap.find(key);
+            if (cIter != cMap.end()) {
+                SchemaClass& schema = cIter->second;
+                string   body;
+
+                encodeHeader(outBuffer, 's', sequence);
+                schema.writeSchemaCall(body);
+                outBuffer.putRawData(body);
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
+                outBuffer.reset();
+                found = true;
+            }
         }
     }
+
+    if (found) {
+        connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
+        QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+    }
 }
 
 void ManagementAgentImpl::handleConsoleAddedIndication()
@@ -969,18 +980,6 @@ ManagementAgentImpl::PackageMap::iterato
     pair<PackageMap::iterator, bool> result =
         packages.insert(pair<string, ClassMap>(name, ClassMap()));
 
-    if (connected) {
-        // Publish a package-indication message
-        Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-        uint32_t outLen;
-
-        encodeHeader(outBuffer, 'p');
-        encodePackageIndication(outBuffer, result.first);
-        outLen = MA_BUFFER_SIZE - outBuffer.available();
-        outBuffer.reset();
-        connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package");
-    }
-
     return result.first;
 }
 
@@ -1038,131 +1037,146 @@ void ManagementAgentImpl::encodeClassInd
     QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
 }
 
+struct MessageItem {
+    string content;
+    Variant::Map headers;
+    string key;
+    MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
+};
+
 void ManagementAgentImpl::periodicProcessing()
 {
     string addr_key_base = "agent.ind.data.";
-    sys::Mutex::ScopedLock lock(agentLock);
     list<ObjectId> deleteList;
-
-    if (!connected)
-        return;
+    list<boost::shared_ptr<MessageItem> > message_list;
 
     sendHeartbeat();
 
-    moveNewObjectsLH();
-
-    //
-    //  Clear the been-here flag on all objects in the map.
-    //
-    for (ObjectMap::iterator iter = managementObjects.begin();
-         iter != managementObjects.end();
-         iter++) {
-        ManagementObject* object = iter->second.get();
-        object->setFlags(0);
-        if (publishAllData) {
-            object->setForcePublish(true);
-        }
-    }
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
 
-    publishAllData = false;
+        if (!connected)
+            return;
 
-    //
-    //  Process the entire object map.
-    //
-    uint32_t v2Objs = 0;
-
-    for (ObjectMap::iterator baseIter = managementObjects.begin();
-         baseIter != managementObjects.end();
-         baseIter++) {
-        ManagementObject* baseObject = baseIter->second.get();
+        moveNewObjectsLH();
 
         //
-        //  Skip until we find a base object requiring a sent message.
+        //  Clear the been-here flag on all objects in the map.
         //
-        if (baseObject->getFlags() == 1 ||
-            (!baseObject->getConfigChanged() &&
-             !baseObject->getInstChanged() &&
-             !baseObject->getForcePublish() &&
-             !baseObject->isDeleted()))
-            continue;
-
-        std::string packageName = baseObject->getPackageName();
-        std::string className = baseObject->getClassName();
-
-        Variant::List list_;
-        string content;
-        std::stringstream addr_key;
-        Variant::Map  headers;
-
-        addr_key << addr_key_base;
-        addr_key << keyifyNameStr(packageName)
-                 << "." << keyifyNameStr(className)
-                 << "." << vendorNameKey
-                 << "." << productNameKey
-                 << "." << instanceNameKey;
-
-        headers["method"] = "indication";
-        headers["qmf.opcode"] = "_data_indication";
-        headers["qmf.content"] = "_data";
-        headers["qmf.agent"] = name_address;
-
-        for (ObjectMap::iterator iter = baseIter;
+        for (ObjectMap::iterator iter = managementObjects.begin();
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second.get();
-            bool send_stats, send_props;
-            if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
-                object->setFlags(1);
-                if (object->getConfigChanged() || object->getInstChanged())
-                    object->setUpdateTime();
+            object->setFlags(0);
+            if (publishAllData) {
+                object->setForcePublish(true);
+            }
+        }
+
+        publishAllData = false;
+
+        //
+        //  Process the entire object map.
+        //
+        uint32_t v2Objs = 0;
 
-                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
-                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+        for (ObjectMap::iterator baseIter = managementObjects.begin();
+             baseIter != managementObjects.end();
+             baseIter++) {
+            ManagementObject* baseObject = baseIter->second.get();
+
+            //
+            //  Skip until we find a base object requiring a sent message.
+            //
+            if (baseObject->getFlags() == 1 ||
+                (!baseObject->getConfigChanged() &&
+                 !baseObject->getInstChanged() &&
+                 !baseObject->getForcePublish() &&
+                 !baseObject->isDeleted()))
+                continue;
+
+            std::string packageName = baseObject->getPackageName();
+            std::string className = baseObject->getClassName();
+
+            Variant::List list_;
+            std::stringstream addr_key;
+            Variant::Map  headers;
+
+            addr_key << addr_key_base;
+            addr_key << keyifyNameStr(packageName)
+                     << "." << keyifyNameStr(className)
+                     << "." << vendorNameKey
+                     << "." << productNameKey
+                     << "." << instanceNameKey;
+
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = name_address;
+
+            for (ObjectMap::iterator iter = baseIter;
+                 iter != managementObjects.end();
+                 iter++) {
+                ManagementObject* object = iter->second.get();
+                bool send_stats, send_props;
+                if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+                    object->setFlags(1);
+                    if (object->getConfigChanged() || object->getInstChanged())
+                        object->setUpdateTime();
+
+                    send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                    send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+                    if (send_stats || send_props) {
+                        Variant::Map map_;
+                        Variant::Map values;
+                        Variant::Map oid;
+
+                        object->getObjectId().mapEncode(oid);
+                        map_["_object_id"] = oid;
+                        map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                               object->getClassName(),
+                                                               object->getMd5Sum());
+                        object->writeTimestamps(map_);
+                        object->mapEncodeValues(values, send_props, send_stats);
+                        map_["_values"] = values;
+                        list_.push_back(map_);
 
-                if (send_stats || send_props) {
-                    Variant::Map map_;
-                    Variant::Map values;
-                    Variant::Map oid;
-
-                    object->getObjectId().mapEncode(oid);
-                    map_["_object_id"] = oid;
-                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
-                                                           object->getClassName(),
-                                                           object->getMd5Sum());
-                    object->writeTimestamps(map_);
-                    object->mapEncodeValues(values, send_props, send_stats);
-                    map_["_values"] = values;
-                    list_.push_back(map_);
-
-                    if (++v2Objs >= maxV2ReplyObjs) {
-                        v2Objs = 0;
-                        ListCodec::encode(list_, content);
-
-                        connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
-                        list_.clear();
-                        content.clear();
-                        QPID_LOG(trace, "SENT DataIndication");
+                        if (++v2Objs >= maxV2ReplyObjs) {
+                            v2Objs = 0;
+                            boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+                            ListCodec::encode(list_, item->content);
+                            message_list.push_back(item);
+                            list_.clear();
+                        }
                     }
+
+                    if (object->isDeleted())
+                        deleteList.push_back(iter->first);
+                    object->setForcePublish(false);
                 }
+            }
 
-                if (object->isDeleted())
-                    deleteList.push_back(iter->first);
-                object->setForcePublish(false);
+            if (!list_.empty()) {
+                boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+                ListCodec::encode(list_, item->content);
+                message_list.push_back(item);
             }
         }
 
-        if (!list_.empty()) {
-            ListCodec::encode(list_, content);
-            connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
-            QPID_LOG(trace, "SENT DataIndication");
-        }
+        // Delete flagged objects
+        for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+             iter != deleteList.rend();
+             iter++)
+            managementObjects.erase(*iter);
     }
 
-    // Delete flagged objects
-    for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
-         iter != deleteList.rend();
-         iter++)
-        managementObjects.erase(*iter);
+    while (!message_list.empty()) {
+        boost::shared_ptr<MessageItem> item(message_list.front());
+        message_list.pop_front();
+        connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
+        QPID_LOG(trace, "SENT DataIndication");
+    }
 }
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org