You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/06/17 19:08:56 UTC

svn commit: r1686078 - in /qpid/trunk/qpid/cpp: etc/ include/qpid/messaging/ src/qpid/ src/qpid/client/ src/qpid/client/amqp0_10/ src/qpid/framing/ src/qpid/messaging/

Author: gsim
Date: Wed Jun 17 17:08:55 2015
New Revision: 1686078

URL: http://svn.apache.org/r1686078
Log:
QPID-6256: Improved handling of protocol version incompatibilities

* 0-10 path no longer hans on open when connecting to a broker not
  supporting that version
* the 'protocol'  connection option now supports specifying multiple
  protocols to try in order (as a coma separated list)
* the protocol defaults, i.e. the value assumed if the 'protocol'
  connection option is not specified, can now be set via the client
  config file (e.g. protocol-defaults=amqp1.0,amqp0-10) or an
  environment variable (e.g QPID_PROTOCOL_DEFAULTS=amqp1.0,amqp0-10)
* if neither the connection option nor the defaults are specified
  all valid versions will be tried (currently amqp0-10, then amqp1.0
  but this may change in a future version)

Modified:
    qpid/trunk/qpid/cpp/etc/qpidc.conf
    qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h
    qpid/trunk/qpid/cpp/src/qpid/Exception.h
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp

Modified: qpid/trunk/qpid/cpp/etc/qpidc.conf
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/etc/qpidc.conf?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/etc/qpidc.conf (original)
+++ qpid/trunk/qpid/cpp/etc/qpidc.conf Wed Jun 17 17:08:55 2015
@@ -21,3 +21,6 @@
 #   name=value
 #
 # (Note: no spaces on either side of '=')
+
+# To make AMQP 1.0 the default, uncomment the following line
+#protocol-defaults=amqp1.0,amqp0-10

Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/exceptions.h Wed Jun 17 17:08:55 2015
@@ -220,6 +220,11 @@ struct QPID_MESSAGING_CLASS_EXTERN Conne
     QPID_MESSAGING_EXTERN ConnectionError(const std::string&);
 };
 
+struct QPID_MESSAGING_CLASS_EXTERN ProtocolVersionError : public ConnectionError
+{
+    QPID_MESSAGING_EXTERN ProtocolVersionError(const std::string&);
+};
+
 struct QPID_MESSAGING_CLASS_EXTERN AuthenticationFailure : public ConnectionError
 {
     QPID_MESSAGING_EXTERN AuthenticationFailure(const std::string&);

Modified: qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/Exception.h Wed Jun 17 17:08:55 2015
@@ -86,6 +86,10 @@ struct TransportFailure : public Excepti
     TransportFailure(const std::string& msg=std::string()) : Exception(msg) {}
 };
 
+struct ProtocolVersionError : public TransportFailure {
+    ProtocolVersionError(const std::string& msg=std::string()) : TransportFailure(msg) {}
+};
+
 } // namespace qpid
 
 #endif  /*!_Exception_*/

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Wed Jun 17 17:08:55 2015
@@ -45,7 +45,7 @@ using namespace qpid::sys;
 namespace qpid {
 namespace client {
 
-Connection::Connection() : version(framing::highestProtocolVersion)
+Connection::Connection() : version(framing::ProtocolVersion(0, 10))
 {
     ConnectionImpl::init();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Jun 17 17:08:55 2015
@@ -283,8 +283,13 @@ void ConnectionImpl::open()
     // If the connect fails then the connector is cleaned up either when we try to connect again
     // - in that case in connector.reset() above;
     // - or when we are deleted
-    handler.waitForOpen();
-    QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+    try {
+        handler.waitForOpen();
+        QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+    } catch (const Exception& e) {
+        connector->checkVersion(version);
+        throw;
+    }
 
     // If the SASL layer has provided an "operational" userId for the connection,
     // put it in the negotiated settings.

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Wed Jun 17 17:08:55 2015
@@ -24,6 +24,7 @@
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/SecurityLayer.h"
+#include "qpid/framing/ProtocolInitiation.h"
 
 #include <map>
 
@@ -68,5 +69,26 @@ void Connector::activateSecurityLayer(st
 {
 }
 
+bool Connector::checkProtocolHeader(framing::Buffer& in, const framing::ProtocolVersion& version)
+{
+    if (!header) {
+        boost::shared_ptr<framing::ProtocolInitiation> protocolInit(new framing::ProtocolInitiation);
+        if (protocolInit->decode(in)) {
+            header = protocolInit;
+            QPID_LOG(debug, "RECV [" << getIdentifier() << "]: INIT(" << *protocolInit << ")");
+            checkVersion(version);
+        }
+    }
+    return header;
+}
+
+void Connector::checkVersion(const framing::ProtocolVersion& version)
+{
+    if (header && !header->matches(version)){
+        throw ProtocolVersionError(QPID_MSG("Incorrect version: " << *header
+                                            << "; expected " << version));
+    }
+}
+
 
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Wed Jun 17 17:08:55 2015
@@ -41,6 +41,8 @@ struct SecuritySettings;
 namespace framing {
 class InputHandler;
 class AMQFrame;
+class Buffer;
+class ProtocolInitiation;
 }
 
 namespace client {
@@ -74,6 +76,11 @@ class Connector : public framing::FrameH
     virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
 
     virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
+    void checkVersion(const framing::ProtocolVersion& version);
+  protected:
+    boost::shared_ptr<framing::ProtocolInitiation> header;
+
+    bool checkProtocolHeader(framing::Buffer&, const framing::ProtocolVersion& version);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Wed Jun 17 17:08:55 2015
@@ -388,18 +388,17 @@ void RdmaConnector::readbuff(Rdma::Async
 size_t RdmaConnector::decode(const char* buffer, size_t size) 
 {
     framing::Buffer in(const_cast<char*>(buffer), size);
-    if (!initiated) {
-        framing::ProtocolInitiation protocolInit;
-        if (protocolInit.decode(in)) {
-            //TODO: check the version is correct
-            QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+    try {
+        if (checkProtocolHeader(in, version)) {
+            AMQFrame frame;
+            while(frame.decode(in)){
+                QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+                input->received(frame);
+            }
         }
-        initiated = true;
-    }
-    AMQFrame frame;
-    while(frame.decode(in)){
-        QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        input->received(frame);
+    } catch (const ProtocolVersionError& e) {
+        QPID_LOG(info, "Closing connection due to " << e.what());
+        close();
     }
     return size - in.available();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Wed Jun 17 17:08:55 2015
@@ -385,23 +385,17 @@ void SslConnector::readbuff(AsynchIO& ai
 size_t SslConnector::decode(const char* buffer, size_t size)
 {
     framing::Buffer in(const_cast<char*>(buffer), size);
-    if (!initiated) {
-        framing::ProtocolInitiation protocolInit;
-        if (protocolInit.decode(in)) {
-            QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
-            if(!(protocolInit==version)){
-                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
-                                         << " supported version " << version));
+    try {
+        if (checkProtocolHeader(in, version)) {
+            AMQFrame frame;
+            while(frame.decode(in)){
+                QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+                input->received(frame);
             }
-            initiated = true;
-        } else {
-            return size - in.available();
         }
-    }
-    AMQFrame frame;
-    while(frame.decode(in)){
-        QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        input->received(frame);
+    } catch (const ProtocolVersionError& e) {
+        QPID_LOG(info, "Closing connection due to " << e.what());
+        close();
     }
     return size - in.available();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Wed Jun 17 17:08:55 2015
@@ -281,23 +281,17 @@ void TCPConnector::readbuff(AsynchIO& ai
 size_t TCPConnector::decode(const char* buffer, size_t size)
 {
     framing::Buffer in(const_cast<char*>(buffer), size);
-    if (!initiated) {
-        framing::ProtocolInitiation protocolInit;
-        if (protocolInit.decode(in)) {
-            QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
-            if(!(protocolInit==version)){
-                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
-                                         << " supported version " << version));
+    try {
+        if (checkProtocolHeader(in, version)) {
+            AMQFrame frame;
+            while(frame.decode(in)){
+                QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+                input->received(frame);
             }
-            initiated = true;
-        } else {
-            return size - in.available();
         }
-    }
-    AMQFrame frame;
-    while(frame.decode(in)){
-        QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        input->received(frame);
+    } catch (const ProtocolVersionError& e) {
+        QPID_LOG(info, "Closing connection due to " << e.what());
+        close();
     }
     return size - in.available();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Wed Jun 17 17:08:55 2015
@@ -305,6 +305,8 @@ bool ConnectionImpl::tryConnect()
             QPID_LOG(info, "Connected to " << *i);
             mergeUrls(connection.getInitialBrokers(), l);
             return resetSessions(l);
+        } catch (const qpid::ProtocolVersionError& e) {
+            throw qpid::messaging::ProtocolVersionError("AMQP 0-10 not supported");
         } catch (const qpid::TransportFailure& e) {
             QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h Wed Jun 17 17:08:55 2015
@@ -47,6 +47,7 @@ public:
     inline uint8_t getMinor() const { return version.getMinor(); }
     inline ProtocolVersion getVersion() const { return version; }
     bool operator==(ProtocolVersion v) const { return v == getVersion(); }
+    bool matches(ProtocolVersion v) const { return v == getVersion(); }
 };
 
 QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& pi);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/Connection.cpp Wed Jun 17 17:08:55 2015
@@ -48,34 +48,35 @@ Connection::Connection(const std::string
     Variant::Map options;
     AddressParser parser(o);
     if (o.empty() || parser.parseMap(options)) {
-        ConnectionImpl* impl = ProtocolRegistry::create(url, options);
-        if (impl) {
-            PI::ctor(*this, impl);
-        } else {
-            PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
-        }
+        PI::ctor(*this, ProtocolRegistry::create(url, options));
     } else {
         throw InvalidOptionString("Invalid option string: " + o);
     }
 }
 Connection::Connection(const std::string& url, const Variant::Map& options)
 {
-    ConnectionImpl* impl = ProtocolRegistry::create(url, options);
-    if (impl) {
-        PI::ctor(*this, impl);
-    } else {
-        PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
-    }
+    PI::ctor(*this, ProtocolRegistry::create(url, options));
 }
 
 Connection::Connection()
 {
     Variant::Map options;
     std::string url = "127.0.0.1:5672";
-    PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
+    PI::ctor(*this, ProtocolRegistry::create(url, options));
 }
 
-void Connection::open() { impl->open(); }
+void Connection::open()
+{
+    while (true) {
+        try {
+            impl->open();
+            return;
+        } catch (const ProtocolVersionError& e) {
+            PI::set(*this, ProtocolRegistry::next(PI::get(impl).get()));
+            QPID_LOG(info, e.what() << ", trying alternative protocol version...");
+        }
+    }
+}
 bool Connection::isOpen() { return impl->isOpen(); }
 bool Connection::isOpen() const { return impl->isOpen(); }
 void Connection::close() { impl->close(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ConnectionImpl.h Wed Jun 17 17:08:55 2015
@@ -22,6 +22,7 @@
  *
  */
 #include <string>
+#include <boost/function.hpp>
 #include "qpid/RefCounted.h"
 
 namespace qpid {
@@ -32,6 +33,7 @@ class Variant;
 
 namespace messaging {
 
+class ProtocolRegistry;
 class Session;
 
 class ConnectionImpl : public virtual qpid::RefCounted
@@ -49,7 +51,10 @@ class ConnectionImpl : public virtual qp
     virtual void reconnect() = 0;
     virtual std::string getUrl() const = 0;
   private:
+  friend class ProtocolRegistry;
+    boost::function<ConnectionImpl*()> next;
 };
+
 }} // namespace qpid::messaging
 
 #endif  /*!QPID_MESSAGING_CONNECTIONIMPL_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.cpp Wed Jun 17 17:08:55 2015
@@ -22,14 +22,116 @@
 #include "qpid/messaging/exceptions.h"
 #include "qpid/client/amqp0_10/ConnectionImpl.h"
 #include "qpid/client/LoadPlugins.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/StringUtils.h"
+#include "config.h"
 #include <map>
+#include <sstream>
+#include <boost/bind.hpp>
 
 using qpid::types::Variant;
 
 namespace qpid {
 namespace messaging {
 namespace {
-typedef std::map<std::string, ProtocolRegistry::Factory*> Registry;
+struct ProtocolOptions : qpid::Options
+{
+    std::string protocolDefaults;
+
+    ProtocolOptions() : qpid::Options("Protocol Settings")
+    {
+        addOptions()
+            ("protocol-defaults", optValue(protocolDefaults, "PROTOCOLS"), "Protocols to use when none are specified");
+    }
+};
+const std::string SEPARATOR(", ");
+const std::string EMPTY;
+std::string join(const std::vector<std::string>& in, const std::string& base=EMPTY, const std::string& separator = SEPARATOR)
+{
+    std::stringstream out;
+    if (!base.empty()) out << base;
+    for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
+        if (i != in.begin()) out << separator;
+        out << *i;
+    }
+    return out.str();
+}
+
+typedef std::map<std::string, ProtocolRegistry::Factory*> Factories;
+
+ConnectionImpl* create_0_10(const std::string& url, const qpid::types::Variant::Map& options)
+{
+    return new qpid::client::amqp0_10::ConnectionImpl(url, options);
+}
+
+class Registry
+{
+  public:
+    Registry()
+    {
+        factories["amqp0-10"] = &create_0_10;
+        CommonOptions common("", "", QPIDC_CONF_FILE);
+        ProtocolOptions options;
+        try {
+            common.parse(0, 0, common.clientConfig, true);
+            options.parse (0, 0, common.clientConfig, true);
+        } catch (const std::exception& e) {
+            throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising Protocol Registry: " << e.what()));
+        }
+        QPID_LOG(debug, "Protocol defaults: " << options.protocolDefaults);
+        if (!options.protocolDefaults.empty()) {
+            split(versions, options.protocolDefaults, ", ");
+        }
+    }
+    ProtocolRegistry::Factory* find(const std::string& name) const
+    {
+        Factories::const_iterator i = factories.find(name);
+        if (i == factories.end()) {
+            std::stringstream error;
+            error << "Unsupported protocol: " << name;
+            error << " (valid values are " << getNames() << ")";
+            throw MessagingException(error.str());
+        } else {
+            return i->second;
+        }
+    }
+    void add(const std::string& name, ProtocolRegistry::Factory* factory)
+    {
+        factories[name] = factory;
+    }
+    std::string getNames() const
+    {
+        std::stringstream names;
+        for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+            if (i != factories.begin()) names << ", ";
+            names << i->first;
+        }
+        return names.str();
+    }
+    void collectNames(std::vector<std::string>& names) const
+    {
+        for (std::vector< std::string >::const_iterator i = versions.begin(); i != versions.end(); ++i) {
+            Factories::const_iterator j = factories.find(*i);
+            if (j == factories.end()) {
+                QPID_LOG(notice, "Unsupported protocol specified in defaults " << *i);
+            } else {
+                names.push_back(*i);
+            }
+        }
+        if (names.empty()) {
+            if (!versions.empty()) {
+                QPID_LOG(warning, "Protocol defaults specified are not valid (" << join(versions) << ") falling back to  " << getNames());
+            }
+            for (Factories::const_iterator i = factories.begin(); i != factories.end(); ++i) {
+                names.push_back(i->first);
+            }
+        }
+    }
+  private:
+    Factories factories;
+    std::vector<std::string> versions;
+};
 
 Registry& theRegistry()
 {
@@ -57,17 +159,42 @@ ConnectionImpl* ProtocolRegistry::create
     qpid::client::theModuleLoader();//ensure modules are loaded
     Variant name;
     Variant::Map stripped;
+    std::vector<std::string> versions;
     if (extract("protocol", name, options, stripped)) {
-        Registry::const_iterator i = theRegistry().find(name.asString());
-        if (i != theRegistry().end()) return (i->second)(url, stripped);
-        else if (name.asString() == "amqp0-10") return new qpid::client::amqp0_10::ConnectionImpl(url, stripped);
-        else throw MessagingException("Unsupported protocol: " + name.asString());
+        split(versions, name.asString(), ", ");
+    } else {
+        theRegistry().collectNames(versions);
     }
-    return 0;
+    bool debugOn;
+    QPID_LOG_TEST(debug, debugOn);
+    if (debugOn) {
+        QPID_LOG(debug, "Trying versions " << join(versions));
+    }
+    return createInternal(versions, url, stripped, join(versions, "No suitable protocol version supported by peer, tried "));
+}
+
+ConnectionImpl* ProtocolRegistry::createInternal(const std::vector<std::string>& requested, const std::string& url, const Variant::Map& options, const std::string& error)
+{
+    std::vector<std::string>::const_iterator i = requested.begin();
+    if (i == requested.end())
+        throw MessagingException(error);
+    std::string name = *i;
+    ConnectionImpl* result = theRegistry().find(name)(url, options);
+    result->next = boost::bind(&ProtocolRegistry::createInternal, std::vector<std::string>(++i, requested.end()), url, options, error);
+    return result;
+ }
+
+ConnectionImpl* ProtocolRegistry::next(ConnectionImpl* last)
+{
+    if (last->next) {
+        return last->next();
+    }
+    throw MessagingException("No suitable protocol version supported by peer");
 }
+
 void ProtocolRegistry::add(const std::string& name, Factory* factory)
 {
-    theRegistry()[name] = factory;
+    theRegistry().add(name, factory);
 }
 
 }} // namespace qpid::messaging

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/ProtocolRegistry.h Wed Jun 17 17:08:55 2015
@@ -23,8 +23,8 @@
  */
 
 #include "qpid/messaging/ImportExport.h"
-
 #include "qpid/types/Variant.h"
+#include <vector>
 
 namespace qpid {
 namespace messaging {
@@ -37,8 +37,10 @@ class ProtocolRegistry
   public:
     typedef ConnectionImpl* Factory(const std::string& url, const qpid::types::Variant::Map& options);
     static ConnectionImpl* create(const std::string& url, const qpid::types::Variant::Map& options);
+    static ConnectionImpl* next(ConnectionImpl*);
     QPID_MESSAGING_EXTERN static void add(const std::string& name, Factory* factory);
   private:
+    static ConnectionImpl* createInternal(const std::vector<std::string>& versions, const std::string& url, const qpid::types::Variant::Map& options, const std::string& error);
 };
 }} // namespace qpid::messaging
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp?rev=1686078&r1=1686077&r2=1686078&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/exceptions.cpp Wed Jun 17 17:08:55 2015
@@ -57,6 +57,7 @@ TransactionUnknown::TransactionUnknown(c
 UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
 
 ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}
+ProtocolVersionError::ProtocolVersionError(const std::string& msg) : ConnectionError(msg) {}
 AuthenticationFailure::AuthenticationFailure(const std::string& msg) : ConnectionError(msg) {}
 
 TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org