You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/12 21:59:19 UTC
svn commit: r984935 - in /qpid/trunk/qpid/cpp/src/qpid/agent:
ManagementAgentImpl.cpp ManagementAgentImpl.h
Author: kgiusti
Date: Thu Aug 12 19:59:19 2010
New Revision: 984935
URL: http://svn.apache.org/viewvc?rev=984935&view=rev
Log:
QPID-2791: batch up data indications and replies
Modified:
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=984935&r1=984934&r2=984935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Aug 12 19:59:19 2010
@@ -106,6 +106,7 @@ ManagementAgentImpl::ManagementAgentImpl
schemaTimestamp(Duration(EPOCH, now())),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
+ maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter
connThreadBody(*this), connThread(connThreadBody),
pubThreadBody(*this), pubThread(pubThreadBody)
{
@@ -677,6 +678,7 @@ void ManagementAgentImpl::handleGetQuery
if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
packageName = s_iter->second.asString();
+ unsigned int objCount = 0;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++) {
@@ -684,10 +686,9 @@ void ManagementAgentImpl::handleGetQuery
if (object->getClassName() == className &&
(packageName.empty() || object->getPackageName() == packageName)) {
- // @todo support multiple object reply per message
values.clear();
- list_.clear();
oidMap.clear();
+ map_.clear();
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -702,16 +703,20 @@ void ManagementAgentImpl::handleGetQuery
object->getMd5Sum());
list_.push_back(map_);
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ if (++objCount >= maxV2ReplyObjs) {
+ objCount = 0;
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ content.clear();
+ list_.clear();
+ }
}
}
}
}
- // Send empty "non-partial" message to indicate CommandComplete
- list_.clear();
+ // Send last "non-partial" message to indicate CommandComplete
headers.erase("partial");
ListCodec::encode(list_, content);
connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
@@ -971,6 +976,8 @@ void ManagementAgentImpl::periodicProces
if (!connected)
return;
+ sendHeartbeat();
+
moveNewObjectsLH();
//
@@ -991,6 +998,8 @@ void ManagementAgentImpl::periodicProces
//
// Process the entire object map.
//
+ uint32_t v2Objs = 0;
+
for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
baseIter != managementObjects.end();
baseIter++) {
@@ -1010,6 +1019,21 @@ void ManagementAgentImpl::periodicProces
std::string className = baseObject->getClassName();
Variant::List list_;
+ string content;
+ std::stringstream addr_key;
+ Variant::Map headers;
+
+ addr_key << addr_key_base;
+ addr_key << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
@@ -1038,6 +1062,16 @@ void ManagementAgentImpl::periodicProces
object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
list_.push_back(map_);
+
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+ ListCodec::encode(list_, content);
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
+ list_.clear();
+ content.clear();
+ QPID_LOG(trace, "SENT DataIndication");
+ }
}
if (object->isDeleted())
@@ -1046,23 +1080,8 @@ void ManagementAgentImpl::periodicProces
}
}
- string content;
- ListCodec::encode(list_, content);
- if (content.length()) {
- Variant::Map headers;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- std::stringstream addr_key;
- addr_key << addr_key_base;
- addr_key << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
+ if (!list_.empty()) {
+ ListCodec::encode(list_, content);
connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
QPID_LOG(trace, "SENT DataIndication");
}
@@ -1077,7 +1096,6 @@ void ManagementAgentImpl::periodicProces
}
deleteList.clear();
- sendHeartbeat();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=984935&r1=984934&r2=984935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Aug 12 19:59:19 2010
@@ -181,6 +181,10 @@ class ManagementAgentImpl : public Manag
uint32_t assignedAgentBank;
uint16_t bootSequence;
+ // Maximum # of objects allowed in a single V2 response
+ // message.
+ uint32_t maxV2ReplyObjs;
+
static const uint8_t DEBUG_OFF = 0;
static const uint8_t DEBUG_CONN = 1;
static const uint8_t DEBUG_PROTO = 2;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org