You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/12 21:59:19 UTC

svn commit: r984935 - in /qpid/trunk/qpid/cpp/src/qpid/agent: ManagementAgentImpl.cpp ManagementAgentImpl.h

Author: kgiusti
Date: Thu Aug 12 19:59:19 2010
New Revision: 984935

URL: http://svn.apache.org/viewvc?rev=984935&view=rev
Log:
QPID-2791: batch up data indications and replies

Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=984935&r1=984934&r2=984935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Aug 12 19:59:19 2010
@@ -106,6 +106,7 @@ ManagementAgentImpl::ManagementAgentImpl
     schemaTimestamp(Duration(EPOCH, now())),
     clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
     assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
+    maxV2ReplyObjs(10),   // KAG todo: make this a tuneable parameter
     connThreadBody(*this), connThread(connThreadBody),
     pubThreadBody(*this), pubThread(pubThreadBody)
 {
@@ -677,6 +678,7 @@ void ManagementAgentImpl::handleGetQuery
                 if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
                     packageName = s_iter->second.asString();
 
+                unsigned int objCount = 0;
                 for (ManagementObjectMap::iterator iter = managementObjects.begin();
                      iter != managementObjects.end();
                      iter++) {
@@ -684,10 +686,9 @@ void ManagementAgentImpl::handleGetQuery
                     if (object->getClassName() == className &&
                         (packageName.empty() || object->getPackageName() == packageName)) {
 
-                        // @todo support multiple object reply per message
                         values.clear();
-                        list_.clear();
                         oidMap.clear();
+                        map_.clear();
 
                         if (object->getConfigChanged() || object->getInstChanged())
                             object->setUpdateTime();
@@ -702,16 +703,20 @@ void ManagementAgentImpl::handleGetQuery
                                                                object->getMd5Sum());
                         list_.push_back(map_);
 
-                        ListCodec::encode(list_, content);
-                        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-                        QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+                        if (++objCount >= maxV2ReplyObjs) {
+                            objCount = 0;
+                            ListCodec::encode(list_, content);
+                            connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+                            QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+                            content.clear();
+                            list_.clear();
+                        }
                     }
                 }
             }
         }
 
-        // Send empty "non-partial" message to indicate CommandComplete
-        list_.clear();
+        // Send last "non-partial" message to indicate CommandComplete
         headers.erase("partial");
         ListCodec::encode(list_, content);
         connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
@@ -971,6 +976,8 @@ void ManagementAgentImpl::periodicProces
     if (!connected)
         return;
 
+    sendHeartbeat();
+
     moveNewObjectsLH();
 
     //
@@ -991,6 +998,8 @@ void ManagementAgentImpl::periodicProces
     //
     //  Process the entire object map.
     //
+    uint32_t v2Objs = 0;
+
     for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
          baseIter != managementObjects.end();
          baseIter++) {
@@ -1010,6 +1019,21 @@ void ManagementAgentImpl::periodicProces
         std::string className = baseObject->getClassName();
 
         Variant::List list_;
+        string content;
+        std::stringstream addr_key;
+        Variant::Map  headers;
+
+        addr_key << addr_key_base;
+        addr_key << keyifyNameStr(packageName)
+                 << "." << keyifyNameStr(className)
+                 << "." << vendorNameKey
+                 << "." << productNameKey
+                 << "." << instanceNameKey;
+
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_data";
+        headers["qmf.agent"] = name_address;
 
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
@@ -1038,6 +1062,16 @@ void ManagementAgentImpl::periodicProces
                     object->mapEncodeValues(values, send_props, send_stats);
                     map_["_values"] = values;
                     list_.push_back(map_);
+
+                    if (++v2Objs >= maxV2ReplyObjs) {
+                        v2Objs = 0;
+                        ListCodec::encode(list_, content);
+
+                        connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
+                        list_.clear();
+                        content.clear();
+                        QPID_LOG(trace, "SENT DataIndication");
+                    }
                 }
 
                 if (object->isDeleted())
@@ -1046,23 +1080,8 @@ void ManagementAgentImpl::periodicProces
             }
         }
 
-        string content;
-        ListCodec::encode(list_, content);
-        if (content.length()) {
-            Variant::Map  headers;
-            headers["method"] = "indication";
-            headers["qmf.opcode"] = "_data_indication";
-            headers["qmf.content"] = "_data";
-            headers["qmf.agent"] = name_address;
-
-            std::stringstream addr_key;
-            addr_key << addr_key_base;
-            addr_key << keyifyNameStr(packageName)
-                     << "." << keyifyNameStr(className)
-                     << "." << vendorNameKey
-                     << "." << productNameKey
-                     << "." << instanceNameKey;
-
+        if (!list_.empty()) {
+            ListCodec::encode(list_, content);
             connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
             QPID_LOG(trace, "SENT DataIndication");
         }
@@ -1077,7 +1096,6 @@ void ManagementAgentImpl::periodicProces
     }
 
     deleteList.clear();
-    sendHeartbeat();
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=984935&r1=984934&r2=984935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Aug 12 19:59:19 2010
@@ -181,6 +181,10 @@ class ManagementAgentImpl : public Manag
     uint32_t          assignedAgentBank;
     uint16_t          bootSequence;
 
+    // Maximum # of objects allowed in a single V2 response
+    // message.
+    uint32_t maxV2ReplyObjs;
+
     static const uint8_t DEBUG_OFF     = 0;
     static const uint8_t DEBUG_CONN    = 1;
     static const uint8_t DEBUG_PROTO   = 2;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org