You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/04/16 21:03:37 UTC

svn commit: r935044 - in /qpid/trunk/qpid/cpp/src/qpid: agent/ManagementAgentImpl.cpp broker/Message.cpp broker/Message.h broker/MessageAdapter.cpp broker/MessageAdapter.h management/ManagementAgent.cpp

Author: tross
Date: Fri Apr 16 19:03:36 2010
New Revision: 935044

URL: http://svn.apache.org/viewvc?rev=935044&view=rev
Log:
Fixed problems with the broker's QMFv2 mode:
  - app_id is a message property, not an application header
  - even in v2 mode, the agent must respond to v1 schema requests
  - missing object_id was added to the periodic data updates
  - epoch/boot-sequence added to agent information (locate-response, heartbeat)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Apr 16 19:03:36 2010
@@ -374,6 +374,7 @@ void ManagementAgentImpl::sendHeartbeat(
     map["_values"] = attrMap;
     map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
     map["_values"].asMap()["heartbeat_interval"] = interval;
+    map["_values"].asMap()["epoch"] = bootSequence;
 
     MapCodec::encode(map, content);
     connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
@@ -673,6 +674,7 @@ void ManagementAgentImpl::handleLocateRe
     map["_values"] = attrMap;
     map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
     map["_values"].asMap()["heartbeat_interval"] = interval;
+    map["_values"].asMap()["epoch"] = bootSequence;
 
     MapCodec::encode(map, content);
     connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Apr 16 19:03:36 2010
@@ -99,6 +99,11 @@ const FieldTable* Message::getApplicatio
     return getAdapter().getApplicationHeaders(frames);
 }
 
+std::string Message::getAppId() const
+{
+    return getAdapter().getAppId(frames);
+}
+
 bool Message::isPersistent() const
 {
     return (getAdapter().isPersistent(frames) || forcePersistentPolicy);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Apr 16 19:03:36 2010
@@ -73,6 +73,7 @@ public:
     QPID_BROKER_EXTERN std::string getExchangeName() const;
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
+    QPID_BROKER_EXTERN std::string getAppId() const;
     framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Fri Apr 16 19:03:36 2010
@@ -72,4 +72,10 @@ namespace broker{
         const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
         return p ? p->getPriority() : 0;
     }
+
+    std::string TransferAdapter::getAppId(const framing::FrameSet& f)
+    {
+        const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+        return p ? p->getAppId() : empty;
+    }
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Fri Apr 16 19:03:36 2010
@@ -41,6 +41,7 @@ struct MessageAdapter
     virtual bool isPersistent(const framing::FrameSet& f) = 0;
     virtual bool requiresAccept(const framing::FrameSet& f) = 0;    
     virtual uint8_t getPriority(const framing::FrameSet& f) = 0;
+    virtual std::string getAppId(const framing::FrameSet& f) = 0;
 };
 
 struct TransferAdapter : MessageAdapter
@@ -52,6 +53,7 @@ struct TransferAdapter : MessageAdapter
     bool isImmediate(const framing::FrameSet&);
     bool requiresAccept(const framing::FrameSet& f);    
     uint8_t getPriority(const framing::FrameSet& f);
+    virtual std::string getAppId(const framing::FrameSet& f);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=935044&r1=935043&r2=935044&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Apr 16 19:03:36 2010
@@ -667,7 +667,10 @@ void ManagementAgent::periodicProcessing
                 if ((send_stats || send_props) && qmf2Support) {
                     Variant::Map  map_;
                     Variant::Map values;
+                    Variant::Map oid;
 
+                    object->getObjectId().mapEncode(oid);
+                    map_["_object_id"] = oid;
                     map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
                                                            object->getClassName(),
                                                            "_data",
@@ -675,7 +678,6 @@ void ManagementAgent::periodicProcessing
                     object->mapEncodeValues(values, send_props, send_stats);
                     map_["_values"] = values;
                     list_.push_back(map_);
-
                 }
 
                 if (send_props) pcount++;
@@ -698,7 +700,7 @@ void ManagementAgent::periodicProcessing
                     stringstream key;
                     key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
                     sendBufferLH(msgBuffer, contentSize, mExchange, key.str());
-                    QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+                    QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
                 }
             }
 
@@ -815,6 +817,7 @@ void ManagementAgent::periodicProcessing
         map["_values"] = attrMap;
         map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
         map["_values"].asMap()["heartbeat_interval"] = interval;
+        map["_values"].asMap()["epoch"] = bootSequence;
 
         string content;
         MapCodec::encode(map, content);
@@ -906,8 +909,7 @@ bool ManagementAgent::dispatchCommand (D
 {
     sys::Mutex::ScopedLock lock (userLock);
     Message&  msg = ((DeliverableMessage&) deliverable).getMessage ();
-
-    if (qmf1Support && topic) {
+    if (topic) {
 
         // qmf1 is bound only to the topic management exchange.
         // Parse the routing key.  This management broker should act as though it
@@ -943,11 +945,9 @@ bool ManagementAgent::dispatchCommand (D
     if (qmf2Support) {
 
         if (topic) {
-
             // Intercept messages bound to:
             //  "console.ind.locate.# - process these messages, and also allow them to be forwarded.
-
-            if (routingKey.compare(0, 18, "console.ind.locate") == 0) {
+            if (routingKey == "console.request.agent_locate") {
                 dispatchAgentCommandLH(msg);
                 return true;
             }
@@ -1704,10 +1704,12 @@ void ManagementAgent::handleLocateReques
     map["_values"] = attrMap;
     map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
     map["_values"].asMap()["heartbeat_interval"] = interval;
+    map["_values"].asMap()["epoch"] = bootSequence;
 
     string content;
     MapCodec::encode(map, content);
     sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+    clientWasAdded = true;
 
     QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
 }
@@ -1736,7 +1738,7 @@ bool ManagementAgent::authorizeAgentMess
 
     const framing::FieldTable *headers = msg.getApplicationHeaders();
 
-    if (headers && headers->getAsString("app_id") == "qmf2")
+    if (headers && msg.getAppId() == "qmf2")
     {
         mapMsg = true;
 
@@ -1874,7 +1876,6 @@ bool ManagementAgent::authorizeAgentMess
 void ManagementAgent::dispatchAgentCommandLH(Message& msg)
 {
     string   replyToKey;
-
     const framing::MessageProperties* p =
         msg.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
@@ -1898,14 +1899,12 @@ void ManagementAgent::dispatchAgentComma
     inBuffer.reset();
 
     const framing::FieldTable *headers = msg.getApplicationHeaders();
-
-    if (headers && headers->getAsString("app_id") == "qmf2")
+    if (headers && msg.getAppId() == "qmf2")
     {
         std::string opcode = headers->getAsString("qmf.opcode");
         std::string contentType = headers->getAsString("qmf.content");
         std::string body;
         std::string cid;
-
         inBuffer.getRawData(body, bufferLen);
 
         if (p && p->hasCorrelationId()) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org