You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/10/22 15:42:29 UTC

svn commit: r828685 - in /qpid/trunk/qpid/cpp/src/qpid: broker/Connection.cpp management/ManagementAgent.cpp management/ManagementAgent.h

Author: tross
Date: Thu Oct 22 13:42:29 2009
New Revision: 828685

URL: http://svn.apache.org/viewvc?rev=828685&view=rev
Log:
Added immediate-publish for new connections and agents.  This solves a race condition where
a QMF console may learn about an object before it learns about the agent that controls that
object.

Changed log category for QMF messages from debug to trace.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Oct 22 13:42:29 2009
@@ -97,7 +97,7 @@
         // TODO set last bool true if system connection
         if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
-            agent->addObject(mgmtObject, objectId);
+            agent->addObject(mgmtObject, objectId, true);
         }
         ConnectionState::setUrl(mgmtId);
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Oct 22 13:42:29 2009
@@ -46,7 +46,7 @@
 {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy();
-    QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+    QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
 }
 
 ManagementAgent::ManagementAgent () :
@@ -170,8 +170,9 @@
     addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
 }
 
-ObjectId ManagementAgent::addObject (ManagementObject* object,
-                                      uint64_t          persistId)
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+                                    uint64_t          persistId,
+                                    bool              publishNow)
 {
     Mutex::ScopedLock lock (addLock);
     uint16_t sequence;
@@ -189,6 +190,22 @@
 
     object->setObjectId(objId);
     newManagementObjects[objId] = object;
+
+    if (publishNow) {
+#define IMM_BUFSIZE 65536
+        char rawBuf[IMM_BUFSIZE];
+        Buffer msgBuffer(rawBuf, IMM_BUFSIZE);
+
+        encodeHeader(msgBuffer, 'c');
+        object->writeProperties(msgBuffer);
+        uint32_t contentSize = msgBuffer.getPosition();
+        stringstream key;
+        key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+        msgBuffer.reset();
+        sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+        QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
+    }
+
     return objId;
 }
 
@@ -240,7 +257,7 @@
         outLen = outBuffer.getPosition();
         outBuffer.reset();
         sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey);
-        QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
+        QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
     }
 }
 
@@ -396,7 +413,7 @@
             stringstream key;
             key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
             sendBuffer(msgBuffer, contentSize, mExchange, key.str());
-            QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+            QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
         }
     }
 
@@ -422,7 +439,7 @@
         msgBuffer.reset ();
         routingKey = "console.heartbeat.1.0";
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
-        QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
+        QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
 }
 
@@ -438,7 +455,7 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBuffer (outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+    QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
              replyToKey << " seq=" << sequence);
 }
 
@@ -495,7 +512,7 @@
     inBuffer.getBin128(hash);
     inBuffer.getShortString(methodName);
 
-    QPID_LOG(debug, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+    QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
              methodName << " replyTo=" << replyToKey);
 
     encodeHeader(outBuffer, 'm', sequence);
@@ -507,7 +524,7 @@
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
         sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
+        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
         return;
     }
 
@@ -523,7 +540,7 @@
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
             return;
         }
     }
@@ -553,7 +570,7 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
     sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(debug, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
 }
 
 void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -561,7 +578,7 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
+    QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
 
     encodeHeader (outBuffer, 'b', sequence);
     uuid.encode  (outBuffer);
@@ -569,12 +586,12 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBuffer (outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
+    QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
 }
 
 void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
 {
-    QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
+    QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
 
     for (PackageMap::iterator pIter = packages.begin ();
          pIter != packages.end ();
@@ -588,7 +605,7 @@
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
         sendBuffer (outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(debug, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
+        QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
     }
 
     sendCommandComplete (replyToKey, sequence);
@@ -600,7 +617,7 @@
 
     inBuffer.getShortString(packageName);
 
-    QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
 
     findOrAddPackageLH(packageName);
 }
@@ -611,7 +628,7 @@
 
     inBuffer.getShortString(packageName);
 
-    QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end())
@@ -631,7 +648,7 @@
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
+                QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
                          "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence);
             }
         }
@@ -649,7 +666,7 @@
     inBuffer.getShortString(key.name);
     inBuffer.getBin128(key.hash);
 
-    QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+    QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
              "), replyTo=" << replyToKey);
 
     PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -666,7 +683,7 @@
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
         sendBuffer (outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+        QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
                  "), to=" << replyToKey << " seq=" << sequence);
 
         if (cIter != pIter->second.end())
@@ -697,7 +714,7 @@
     inBuffer.getShortString (key.name);
     inBuffer.getBin128      (key.hash);
 
-    QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+    QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
              "), replyTo=" << replyToKey << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
@@ -715,7 +732,7 @@
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
             }
             else
                 sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -739,7 +756,7 @@
     inBuffer.getBin128(key.hash);
     inBuffer.restore();
 
-    QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+    QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end()) {
@@ -764,7 +781,7 @@
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
                 sendBuffer(outBuffer, outLen, mExchange, "schema.class");
-                QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+                QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
                          " to=schema.class");
             }
         }
@@ -849,7 +866,7 @@
     requestedBrokerBank = inBuffer.getLong();
     requestedAgentBank  = inBuffer.getLong();
 
-    QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+    QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
              " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
 
     assignedBank = assignBankLH(requestedAgentBank);
@@ -866,11 +883,11 @@
     agent->mgmtObject->set_systemId     (systemId);
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
-    addObject (agent->mgmtObject);
+    addObject (agent->mgmtObject, 0, true);
 
     remoteAgents[connectionRef] = agent;
 
-    QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+    QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
 
     // Send an Attach Response
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -882,7 +899,7 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBuffer (outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+    QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
              " to=" << replyToKey << " seq=" << sequence);
 }
 
@@ -895,7 +912,7 @@
 
     ft.decode(inBuffer);
 
-    QPID_LOG(debug, "RECV GetQuery query=" << ft << " seq=" << sequence);
+    QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
 
     value = ft.get("_class");
     if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -920,7 +937,7 @@
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
             }
         }
         sendCommandComplete(replyToKey, sequence);
@@ -947,7 +964,7 @@
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
             }
         }
     }
@@ -1011,7 +1028,7 @@
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
         }
 
         return false;
@@ -1083,7 +1100,7 @@
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     sendBuffer (outBuffer, outLen, mExchange, "schema.package");
-    QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package")
+    QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
 
     return result.first;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=828685&r1=828684&r2=828685&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Oct 22 13:42:29 2009
@@ -80,7 +80,8 @@
                                              uint8_t*    md5Sum,
                                              ManagementObject::writeSchemaCall_t schemaCall);
     QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject* object,
-                                             uint64_t          persistId = 0);
+                                             uint64_t          persistId = 0,
+                                             bool              publishNow = false);
     QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org