You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/11 18:23:20 UTC

svn commit: r897955 - in /qpid/trunk/qpid/cpp: include/qpid/framing/ src/qpid/cluster/ src/qpid/management/ xml/

Author: aconway
Date: Mon Jan 11 17:23:18 2010
New Revision: 897955

URL: http://svn.apache.org/viewvc?rev=897955&view=rev
Log:
Fix broker crash "confirmed N but only sent M" with managed agents running.

The broker's ManagementAgent caches schemas from managed agents.
This cache was not being replicated to new cluster members.

If an agent such as sesame was running and connected to a newly-joined
broker, that broker could send schema request messages which were not
sent by other brokers that had the schema in cache. This resulted in
the other brokers exiting with a "confirmed N but only sent M"
message.

Modified:
    qpid/trunk/qpid/cpp/include/qpid/framing/Buffer.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/include/qpid/framing/Buffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/framing/Buffer.h?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/framing/Buffer.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/framing/Buffer.h Mon Jan 11 17:23:18 2010
@@ -43,9 +43,8 @@
     uint32_t position;
     uint32_t r_position;
 
-    void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); }
-
   public:
+    void checkAvailable(uint32_t count) { if (position + count > size) throw OutOfBounds(); }
 
     /** Buffer input/output iterator.
      * Supports using an amqp_0_10::Codec with a framing::Buffer.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jan 11 17:23:18 2010
@@ -175,7 +175,7 @@
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 884125;
+const uint32_t Cluster::CLUSTER_VERSION = 896973;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Jan 11 17:23:18 2010
@@ -40,6 +40,7 @@
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
 
 #include <boost/current_function.hpp>
 
@@ -478,5 +479,14 @@
     findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
 }
 
+void Connection::managementSchema(const std::string& data) {
+    management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+    if (!agent)
+        throw Exception(QPID_MSG("Management schema update but no management agent."));
+    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+    agent->importSchemas(buf);
+    QPID_LOG(debug, cluster << " updated management schemas");
+}
+
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Jan 11 17:23:18 2010
@@ -167,6 +167,7 @@
     OutputInterceptor& getOutput() { return output; }
 
     void addQueueListener(const std::string& queue, uint32_t listener);
+    void managementSchema(const std::string& data);
 
   private:
     struct NullFrameHandler : public framing::FrameHandler {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Jan 11 17:23:18 2010
@@ -138,10 +138,21 @@
     session.queueDelete(arg::queue=UPDATE);
     session.close();
 
-    // Update queue listeners: must come after sessions so consumerNumbering is populated.
+    // Update queue listeners: must come after sessions so consumerNumbering is populated
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
 
     ClusterConnectionProxy(session).expiryId(expiry.getId());
+
+    // FIXME aconway 2010-01-08: we should enforce that all cluster members 
+    // have mgmt enabled or none of them do.
+
+    management::ManagementAgent* agent = updaterBroker.getManagementAgent();
+    if (agent) {
+        string schemaData;
+        agent->exportSchemas(schemaData);
+        ClusterConnectionProxy(session).managementSchema(schemaData);
+    }
+
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Jan 11 17:23:18 2010
@@ -706,8 +706,7 @@
 
         encodeHeader (outBuffer, 'S', sequence);
         outBuffer.putShortString(packageName);
-        outBuffer.putShortString(key.name);
-        outBuffer.putBin128(key.hash);
+        key.encode(outBuffer);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
         sendBuffer (outBuffer, outLen, dExchange, replyToKey);
@@ -730,7 +729,7 @@
     if (writeSchemaCall != 0)
         writeSchemaCall(buf);
     else
-        buf.putRawData(buffer, bufferLen);
+        buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
 }
 
 void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -739,8 +738,7 @@
     SchemaClassKey key;
 
     inBuffer.getShortString (packageName);
-    inBuffer.getShortString (key.name);
-    inBuffer.getBin128      (key.hash);
+    key.decode(inBuffer);
 
     QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
              "), replyTo=" << replyToKey << " seq=" << sequence);
@@ -780,8 +778,7 @@
     inBuffer.record();
     inBuffer.getOctet();
     inBuffer.getShortString(packageName);
-    inBuffer.getShortString(key.name);
-    inBuffer.getBin128(key.hash);
+    key.decode(inBuffer);
     inBuffer.restore();
 
     QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
@@ -796,9 +793,8 @@
                 QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
                 cMap.erase(key);
             } else {
-                cIter->second.buffer    = (uint8_t*) malloc(length);
-                cIter->second.bufferLen = length;
-                inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
+                cIter->second.data.resize(length);
+                inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
 
                 // Publish a class-indication message
                 Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
@@ -1171,8 +1167,7 @@
 
     buf.putOctet((*cIter).second.kind);
     buf.putShortString((*pIter).first);
-    buf.putShortString(key.name);
-    buf.putBin128(key.hash);
+    key.encode(buf);
 }
 
 size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
@@ -1293,3 +1288,72 @@
 void ManagementAgent::disallow(const std::string& className, const std::string& methodName, const std::string& message) {
     disallowed[std::make_pair(className, methodName)] = message;
 }
+
+void ManagementAgent::SchemaClassKey::encode(framing::Buffer& buffer) const {
+    buffer.checkAvailable(encodedSize());
+    buffer.putShortString(name);
+    buffer.putBin128(hash);
+}
+
+void ManagementAgent::SchemaClassKey::decode(framing::Buffer& buffer) {
+    buffer.checkAvailable(encodedSize());
+    buffer.getShortString(name);
+    buffer.getBin128(hash);
+}
+
+uint32_t ManagementAgent::SchemaClassKey::encodedSize() const {
+    return 1 + name.size() + 16 /* bin128 */;
+}
+
+void ManagementAgent::SchemaClass::encode(framing::Buffer& outBuf) const {
+    outBuf.checkAvailable(encodedSize());
+    outBuf.putOctet(kind);
+    outBuf.putLong(pendingSequence);
+    outBuf.putLongString(data);
+}
+
+void ManagementAgent::SchemaClass::decode(framing::Buffer& inBuf) {
+    inBuf.checkAvailable(encodedSize());
+    kind = inBuf.getOctet();
+    pendingSequence = inBuf.getLong();
+    inBuf.getLongString(data);
+}
+
+uint32_t ManagementAgent::SchemaClass::encodedSize() const {
+    return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size();
+}
+
+void ManagementAgent::exportSchemas(std::string& out) {
+    out.clear();
+    for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
+        string name = i->first;
+        const ClassMap& classes = i ->second;
+        for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
+            const SchemaClassKey& key = j->first;
+            const SchemaClass& klass = j->second;
+            if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
+                // Encode name, schema-key, schema-class
+                size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize();
+                size_t end = out.size();
+                out.resize(end + encodedSize);
+                framing::Buffer outBuf(&out[end], encodedSize);
+                outBuf.putShortString(name);
+                key.encode(outBuf);
+                klass.encode(outBuf);
+            }
+        }
+    }
+}
+
+void ManagementAgent::importSchemas(framing::Buffer& inBuf) {
+    while (inBuf.available()) {
+        string package;
+        SchemaClassKey key;
+        SchemaClass klass;
+        inBuf.getShortString(package);
+        key.decode(inBuf);
+        klass.decode(inBuf);
+        packages[package][key] = klass;
+    }
+}
+

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Jan 11 17:23:18 2010
@@ -97,7 +97,13 @@
 
     /** Disallow a method. Attempts to call it will receive an exception with message. */
     void disallow(const std::string& className, const std::string& methodName, const std::string& message);
-                  
+
+    /** Serialize my schemas as a binary blob into schemaOut */
+    void exportSchemas(std::string& schemaOut);
+
+    /** Decode a serialized schemas and add to my schema cache */
+    void importSchemas(framing::Buffer& inBuf);
+
 private:
     struct Periodic : public qpid::sys::TimerTask
     {
@@ -140,6 +146,10 @@
     {
         std::string name;
         uint8_t     hash[16];
+
+        void encode(framing::Buffer& buffer) const;
+        void decode(framing::Buffer& buffer);
+        uint32_t encodedSize() const;
     };
 
     struct SchemaClassKeyComp
@@ -156,20 +166,24 @@
         }
     };
 
+
     struct SchemaClass
     {
         uint8_t  kind;
         ManagementObject::writeSchemaCall_t writeSchemaCall;
+        std::string data;
         uint32_t pendingSequence;
-        size_t   bufferLen;
-        uint8_t* buffer;
 
-        SchemaClass(uint8_t _kind, uint32_t seq) :
-            kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+        SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
+            kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
         SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
-            kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
-        bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
+            kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
+        bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
         void appendSchema (framing::Buffer& buf);
+
+        void encode(framing::Buffer& buffer) const;
+        void decode(framing::Buffer& buffer);
+        uint32_t encodedSize() const;
     };
 
     typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=897955&r1=897954&r2=897955&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Jan 11 17:23:18 2010
@@ -226,5 +226,10 @@
       <field name="queue" type="str8"/>
       <field name="consumer" type="uint32"/>
     </control>
+
+    <!-- Replicate management agent schema -->
+    <control name="management-schema" code="0x35">
+      <field name="data" type="vbin32"/>
+    </control>
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org