You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/10/22 15:42:29 UTC
svn commit: r828685 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/Connection.cpp management/ManagementAgent.cpp
management/ManagementAgent.h
Author: tross
Date: Thu Oct 22 13:42:29 2009
New Revision: 828685
URL: http://svn.apache.org/viewvc?rev=828685&view=rev
Log:
Added immediate-publish for new connections and agents. This solves a race condition where
a QMF console may learn about an object before it learns about the agent that controls that
object.
Changed log category for QMF messages from debug to trace.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Oct 22 13:42:29 2009
@@ -97,7 +97,7 @@
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
- agent->addObject(mgmtObject, objectId);
+ agent->addObject(mgmtObject, objectId, true);
}
ConnectionState::setUrl(mgmtId);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Oct 22 13:42:29 2009
@@ -46,7 +46,7 @@
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy();
- QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+ QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
}
ManagementAgent::ManagementAgent () :
@@ -170,8 +170,9 @@
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
-ObjectId ManagementAgent::addObject (ManagementObject* object,
- uint64_t persistId)
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+ uint64_t persistId,
+ bool publishNow)
{
Mutex::ScopedLock lock (addLock);
uint16_t sequence;
@@ -189,6 +190,22 @@
object->setObjectId(objId);
newManagementObjects[objId] = object;
+
+ if (publishNow) {
+#define IMM_BUFSIZE 65536
+ char rawBuf[IMM_BUFSIZE];
+ Buffer msgBuffer(rawBuf, IMM_BUFSIZE);
+
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ uint32_t contentSize = msgBuffer.getPosition();
+ stringstream key;
+ key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ msgBuffer.reset();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
+ }
+
return objId;
}
@@ -240,7 +257,7 @@
outLen = outBuffer.getPosition();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey);
- QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
+ QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
}
}
@@ -396,7 +413,7 @@
stringstream key;
key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -422,7 +439,7 @@
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
+ QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
}
@@ -438,7 +455,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+ QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
@@ -495,7 +512,7 @@
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- QPID_LOG(debug, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -507,7 +524,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
return;
}
@@ -523,7 +540,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
return;
}
}
@@ -553,7 +570,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
}
void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -561,7 +578,7 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
+ QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
@@ -569,12 +586,12 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
+ QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
}
void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
{
- QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
+ QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -588,7 +605,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
}
sendCommandComplete (replyToKey, sequence);
@@ -600,7 +617,7 @@
inBuffer.getShortString(packageName);
- QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
findOrAddPackageLH(packageName);
}
@@ -611,7 +628,7 @@
inBuffer.getShortString(packageName);
- QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end())
@@ -631,7 +648,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
+ QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
"(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence);
}
}
@@ -649,7 +666,7 @@
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -666,7 +683,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
if (cIter != pIter->second.end())
@@ -697,7 +714,7 @@
inBuffer.getShortString (key.name);
inBuffer.getBin128 (key.hash);
- QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
@@ -715,7 +732,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
}
else
sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -739,7 +756,7 @@
inBuffer.getBin128(key.hash);
inBuffer.restore();
- QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -764,7 +781,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, mExchange, "schema.class");
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+ QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
}
@@ -849,7 +866,7 @@
requestedBrokerBank = inBuffer.getLong();
requestedAgentBank = inBuffer.getLong();
- QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+ QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
" reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
assignedBank = assignBankLH(requestedAgentBank);
@@ -866,11 +883,11 @@
agent->mgmtObject->set_systemId (systemId);
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
- addObject (agent->mgmtObject);
+ addObject (agent->mgmtObject, 0, true);
remoteAgents[connectionRef] = agent;
- QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+ QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -882,7 +899,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+ QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
@@ -895,7 +912,7 @@
ft.decode(inBuffer);
- QPID_LOG(debug, "RECV GetQuery query=" << ft << " seq=" << sequence);
+ QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -920,7 +937,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandComplete(replyToKey, sequence);
@@ -947,7 +964,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
}
@@ -1011,7 +1028,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
}
return false;
@@ -1083,7 +1100,7 @@
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package")
+ QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
return result.first;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Oct 22 13:42:29 2009
@@ -80,7 +80,8 @@
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0);
+ uint64_t persistId = 0,
+ bool publishNow = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org