You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/30 20:46:18 UTC

svn commit: r799401 - in /qpid/trunk/qpid/cpp/src/qpid: amqp_0_10/Connection.cpp amqp_0_10/Connection.h client/Connector.cpp cluster/ConnectionCodec.cpp cluster/ConnectionCodec.h

Author: aconway
Date: Thu Jul 30 18:46:17 2009
New Revision: 799401

URL: http://svn.apache.org/viewvc?rev=799401&view=rev
Log:
Set protocol versions correctly in cluster code.

Cluster code was broken by a recent checkin to validate protocol
versions.  The cluster was not correctly setting the version on both
sides of a connection.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=799401&r1=799400&r2=799401&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Thu Jul 30 18:46:17 2009
@@ -31,7 +31,8 @@
 using sys::Mutex;
 
 Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
-    : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+    : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+      isClient(_isClient), buffered(0), version(0,10)
 {}
 
 void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
@@ -44,7 +45,9 @@
         //read in protocol header
         framing::ProtocolInitiation pi;
         if (pi.decode(in)) {
-            //TODO: check the version is correct
+            if(!(pi==version))
+                throw Exception(QPID_MSG("Unsupported version: " << pi
+                                         << " supported version " << version));
             QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
         }
         initialized = true;
@@ -128,7 +131,11 @@
 }
 
 framing::ProtocolVersion Connection::getVersion() const {
-    return framing::ProtocolVersion(0,10);
+    return version;
+}
+
+void Connection::setVersion(const framing::ProtocolVersion& v)  {
+    version = v;
 }
 
 size_t Connection::getBuffered() const {

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=799401&r1=799400&r2=799401&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu Jul 30 18:46:17 2009
@@ -55,6 +55,7 @@
     bool initialized;
     bool isClient;
     size_t buffered;
+    framing::ProtocolVersion version;
 
   public:
     QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient);
@@ -71,6 +72,10 @@
     void send(framing::AMQFrame&);
     framing::ProtocolVersion getVersion() const;
     size_t getBuffered() const;
+
+    /** Used by cluster code to set a special version on "update" connections. */
+    // FIXME aconway 2009-07-30: find a cleaner mechanism for this.
+    void setVersion(const framing::ProtocolVersion&);
 };
 
 }} // namespace qpid::amqp_0_10

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=799401&r1=799400&r2=799401&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jul 30 18:46:17 2009
@@ -362,7 +362,8 @@
         if (protocolInit.decode(in)) {
             QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
             if(!(protocolInit==version)){
-                throw Exception(QPID_MSG("Unsupported version: " << protocolInit));
+                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+                                         << " supported version " << version));
             }
         }
         initiated = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=799401&r1=799400&r2=799401&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Jul 30 18:46:17 2009
@@ -38,24 +38,27 @@
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
     if (v == ProtocolVersion(0, 10))
-        return new ConnectionCodec(out, id, cluster, false, false);
-    else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
-        return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
+        return new ConnectionCodec(v, out, id, cluster, false, false);
+    else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
+        return new ConnectionCodec(v, out, id, cluster, true, false); 
     return 0;
 }
 
 // Used for outgoing Link connections
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
-    return new ConnectionCodec(out, logId, cluster, false, true);
+    return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
-    : codec(out, logId, isLink),
-      interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
+ConnectionCodec::ConnectionCodec(
+    const ProtocolVersion& v, sys::OutputControl& out,
+    const std::string& logId, Cluster& cluster, bool catchUp, bool isLink
+) : codec(out, logId, isLink),
+    interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);
+    codec.setVersion(v);
 }
 
 ConnectionCodec::~ConnectionCodec() {}

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=799401&r1=799400&r2=799401&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Jul 30 18:46:17 2009
@@ -56,7 +56,8 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
+    ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
+                    const std::string& logId, Cluster& c, bool catchUp, bool isLink);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org