You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/12 22:23:31 UTC

svn commit: r909610 - in /qpid/trunk/qpid/cpp: include/qpid/management/ src/qpid/broker/ src/qpid/management/ src/qpid/xml/

Author: tross
Date: Fri Feb 12 21:23:27 2010
New Revision: 909610

URL: http://svn.apache.org/viewvc?rev=909610&view=rev
Log:
Changes needed for QPID-2029 (Clustering and Management don't work well together)
This update changes the indexing of object IDs in the broker-resident management agent
from being based on the QMFv1 format (numeric) to the QMFv2 format (string name).  This removes
the need for numeric objectIds to be synchronized across a set of clustered brokers.

Also included in this patch is a fix to a bug in binding creation.  Previously, when a binding
was created that already existed, the management object for the proposed binding (duplicate of
the existing one) was created then destroyed.  This is inefficient and causes problems when the
name-based indexes collide.

Modified:
    qpid/trunk/qpid/cpp/include/qpid/management/ManagementObject.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h

Modified: qpid/trunk/qpid/cpp/include/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/management/ManagementObject.h?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/management/ManagementObject.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/management/ManagementObject.h Fri Feb 12 21:23:27 2010
@@ -27,12 +27,14 @@
 #include <qpid/framing/Buffer.h>
 #include "qpid/CommonImportExport.h"
 #include <map>
+#include <vector>
 
 namespace qpid {
 namespace management {
 
 class Manageable;
 class ObjectId;
+class ManagementObject;
 
 
 class AgentAttachment {
@@ -65,7 +67,9 @@
     QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; };
     QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const;
     QPID_COMMON_EXTERN void decode(framing::Buffer& buffer);
-    QPID_COMMON_EXTERN void setV2Key(const std::string& key) { v2Key = key; }
+    QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
+    QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
+    QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
     QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; }
     friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&);
 };
@@ -192,6 +196,7 @@
 };
 
 typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
+typedef std::vector<ManagementObject*> ManagementObjectVector;
 
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Fri Feb 12 21:23:27 2010
@@ -77,6 +77,7 @@
         if (exclusiveBinding) bk.queues.clear();
 
         if (bk.queues.add_unless(b, MatchQueue(queue))) {
+            b->startManagement();
             propagate = bk.fedBinding.addOrigin(fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Feb 12 21:23:27 2010
@@ -306,9 +306,23 @@
         (*iter)->propagateBinding(routingKey, tags, op, origin, extra_args);
 }
 
-Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
-                           FieldTable _args, const string& origin)
-    : queue(_queue), key(_key), args(_args), mgmtBinding(0)
+Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
+                           FieldTable _args, const string& _origin)
+    : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+{
+}
+
+Exchange::Binding::~Binding ()
+{
+    if (mgmtBinding != 0) {
+        ManagementObject* mo = queue->GetManagementObject();
+        if (mo != 0)
+            static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
+        mgmtBinding->resourceDestroy ();
+    }
+}
+
+void Exchange::Binding::startManagement()
 {
     if (parent != 0)
     {
@@ -333,16 +347,6 @@
     }
 }
 
-Exchange::Binding::~Binding ()
-{
-    if (mgmtBinding != 0) {
-        ManagementObject* mo = queue->GetManagementObject();
-        if (mo != 0)
-            static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
-        mgmtBinding->resourceDestroy ();
-    }
-}
-
 ManagementObject* Exchange::Binding::GetManagementObject () const
 {
     return (ManagementObject*) mgmtBinding;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Feb 12 21:23:27 2010
@@ -45,14 +45,17 @@
         typedef boost::shared_ptr<Binding>       shared_ptr;
         typedef std::vector<Binding::shared_ptr> vector;
 
+        Exchange*                 parent;
         Queue::shared_ptr         queue;
         const std::string         key;
         const framing::FieldTable args;
+        std::string               origin;
         qmf::org::apache::qpid::broker::Binding* mgmtBinding;
 
         Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
                 framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
         ~Binding();
+        void startManagement();
         management::ManagementObject* GetManagementObject() const;
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Fri Feb 12 21:23:27 2010
@@ -63,6 +63,7 @@
     if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
         Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
         if (bindings.add_unless(binding, MatchQueue(queue))) {
+            binding->startManagement();
             propagate = fedBinding.addOrigin(fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Feb 12 21:23:27 2010
@@ -114,6 +114,7 @@
             Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
             BoundKey bk(binding);
             if (bindings.add_unless(bk, MatchArgs(queue, args))) {
+                binding->startManagement();
                 propagate = bk.fedBinding.addOrigin(fedOrigin);
                 if (mgmtExchange != 0) {
                     mgmtExchange->inc_bindingCount();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Feb 12 21:23:27 2010
@@ -207,6 +207,7 @@
             return false;
         } else {
             Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+            binding->startManagement();
             BoundKey& bk = bindings[routingPattern];
             bk.bindingVector.push_back(binding);
             propagate = bk.fedBinding.addOrigin(fedOrigin);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Feb 12 21:23:27 2010
@@ -85,6 +85,12 @@
             delete object;
         }
         managementObjects.clear();
+
+        while (!deletedManagementObjects.empty()) {
+            ManagementObject* object = deletedManagementObjects.back();
+            delete object;
+            deletedManagementObjects.pop_back();
+        }
     }
 }
 
@@ -196,9 +202,20 @@
     }
 
     ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
-    objId.setV2Key(object->getKey());
+    objId.setV2Key(*object);
 
     object->setObjectId(objId);
+    ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+    if (destIter != newManagementObjects.end()) {
+        if (destIter->second->isDeleted()) {
+            newDeletedManagementObjects.push_back(destIter->second);
+            newManagementObjects.erase(destIter);
+        } else {
+            QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+                     " key=" << objId.getV2Key());
+            return objId;
+        }
+    }
     newManagementObjects[objId] = object;
 
     if (publishNow) {
@@ -344,9 +361,31 @@
     Mutex::ScopedLock lock (addLock);
     for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
          iter != newManagementObjects.end ();
-         iter++)
-        managementObjects[iter->first] = iter->second;
+         iter++) {
+        bool skip = false;
+        ManagementObjectMap::iterator destIter = managementObjects.find(iter->first);
+        if (destIter != managementObjects.end()) {
+            // We have an objectId collision with an existing object.  If the old object
+            // is deleted, move it to the deleted list.
+            if (destIter->second->isDeleted()) {
+                deletedManagementObjects.push_back(destIter->second);
+                managementObjects.erase(destIter);
+            } else {
+                QPID_LOG(error, "ObjectId collision in moveNewObjects. class=" <<
+                         iter->second->getClassName() << " key=" << iter->first.getV2Key());
+                skip = true;
+            }
+        }
+
+        if (!skip)
+            managementObjects[iter->first] = iter->second;
+    }
     newManagementObjects.clear();
+
+    while (!newDeletedManagementObjects.empty()) {
+        deletedManagementObjects.push_back(newDeletedManagementObjects.back());
+        newDeletedManagementObjects.pop_back();
+    }
 }
 
 void ManagementAgent::periodicProcessing (void)
@@ -449,7 +488,23 @@
         managementObjects.erase(iter->first);
     }
 
-    if (!deleteList.empty()) {
+    // Publish the deletion of objects created by insert-collision
+    bool collisionDeletions = false;
+    for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
+         cdIter != deletedManagementObjects.end(); cdIter++) {
+        collisionDeletions = true;
+        Buffer msgBuffer(msgChars, BUFSIZE);
+        encodeHeader(msgBuffer, 'c');
+        (*cdIter)->writeProperties(msgBuffer);
+        contentSize = BUFSIZE - msgBuffer.available ();
+        msgBuffer.reset ();
+        stringstream key;
+        key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+        sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+        QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+    }
+
+    if (!deleteList.empty() || collisionDeletions) {
         deleteList.clear();
         deleteOrphanedAgentsLH();
     }
@@ -596,7 +651,7 @@
         }
     }
 
-    ManagementObjectMap::iterator iter = managementObjects.find(objId);
+    ManagementObjectMap::iterator iter = numericFind(objId);
     if (iter == managementObjects.end() || iter->second->isDeleted()) {
         outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
         outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
@@ -967,7 +1022,7 @@
             return;
 
         ObjectId selector(value->get<string>());
-        ManagementObjectMap::iterator iter = managementObjects.find(selector);
+        ManagementObjectMap::iterator iter = numericFind(selector);
         if (iter != managementObjects.end()) {
             ManagementObject* object = iter->second;
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1294,6 +1349,18 @@
     return end - start;
 }
 
+ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
+{
+    ManagementObjectMap::iterator iter = managementObjects.begin();
+    for (; iter != managementObjects.end(); iter++) {
+        if (oid.equalV1(iter->first))
+            break;
+    }
+
+    return iter;
+}
+
+
 void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
 {
     Mutex::ScopedLock lock (addLock);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Feb 12 21:23:27 2010
@@ -217,8 +217,18 @@
 
     RemoteAgentMap               remoteAgents;
     PackageMap                   packages;
+
+    //
+    // Protected by userLock
+    //
     ManagementObjectMap          managementObjects;
+    ManagementObjectVector       deletedManagementObjects;
+
+    //
+    // Protected by addLock
+    //
     ManagementObjectMap          newManagementObjects;
+    ManagementObjectVector       newDeletedManagementObjects;
 
     framing::Uuid                uuid;
     sys::Mutex                   addLock;
@@ -295,6 +305,7 @@
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);
     size_t validateEventSchema(framing::Buffer&);
+    ManagementObjectMap::iterator numericFind(const ObjectId& oid);
     void debugSnapshot(const char*);
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Fri Feb 12 21:23:27 2010
@@ -109,16 +109,18 @@
 
 bool ObjectId::operator==(const ObjectId &other) const
 {
-    uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
-
-    return first == otherFirst && second == other.second;
+    return v2Key == other.v2Key;
 }
 
 bool ObjectId::operator<(const ObjectId &other) const
 {
-    uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+    return v2Key < other.v2Key;
+}
 
-    return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+bool ObjectId::equalV1(const ObjectId &other) const
+{
+    uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+    return first == otherFirst && second == other.second;
 }
 
 void ObjectId::encode(framing::Buffer& buffer) const
@@ -136,6 +138,14 @@
     second = buffer.getLongLong();
 }
 
+void ObjectId::setV2Key(const ManagementObject& object)
+{
+    std::stringstream oname;
+    oname << object.getPackageName() << "." << object.getClassName() << ":" << object.getKey();
+    v2Key = oname.str();
+}
+
+
 namespace qpid {
 namespace management {
 

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=909610&r1=909609&r2=909610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Fri Feb 12 21:23:27 2010
@@ -53,7 +53,7 @@
 		   const ::qpid::framing::FieldTable& _arguments, Query query):
             Binding(key, queue, parent, _arguments),
 	      xquery(query),
-	      parse_message_content(true) {}
+                parse_message_content(true) { startManagement(); }
     };
 
         



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org