You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [11/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.h Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,58 +34,57 @@
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
-    class Commit{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Commit(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-    class Rollback{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Rollback(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-
-    boost::intrusive_ptr<Message> msg;
-    std::list<boost::shared_ptr<Queue> > queues;
-    std::list<boost::shared_ptr<Queue> > prepared;
-
-    void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
-  public:
-    QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
-    QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
-    QPID_BROKER_EXTERN virtual void commit() throw();
-    QPID_BROKER_EXTERN virtual void rollback() throw();
-
-    virtual Message& getMessage() { return *msg; };
-
-    QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
-    virtual ~TxPublish(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
-    QPID_BROKER_EXTERN uint64_t contentSize();
-
-    boost::intrusive_ptr<Message> getMessage() const { return msg; }
-    const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
-    const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
+    namespace broker {
+        /**
+         * Defines the behaviour for publish operations on a
+         * transactional channel. Messages are routed through
+         * exchanges when received but are not at that stage delivered
+         * to the matching queues, rather the queues are held in an
+         * instance of this class. On prepare() the message is marked
+         * enqueued to the relevant queues in the MessagesStore. On
+         * commit() the messages will be passed to the queue for
+         * dispatch or to be added to the in-memory queue.
+         */
+        class TxPublish : public TxOp, public Deliverable{
+
+            class Commit{
+                boost::intrusive_ptr<Message>& msg;
+            public:
+                Commit(boost::intrusive_ptr<Message>& msg);
+                void operator()(const boost::shared_ptr<Queue>& queue);            
+            };
+            class Rollback{
+                boost::intrusive_ptr<Message>& msg;
+            public:
+                Rollback(boost::intrusive_ptr<Message>& msg);
+                void operator()(const boost::shared_ptr<Queue>& queue);            
+            };
+
+            boost::intrusive_ptr<Message> msg;
+             std::list<boost::shared_ptr<Queue> > queues;
+            std::list<boost::shared_ptr<Queue> > prepared;
+
+            void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
+
+        public:
+            QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
+            QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
+            QPID_BROKER_EXTERN virtual void commit() throw();
+            QPID_BROKER_EXTERN virtual void rollback() throw();
+
+	    virtual Message& getMessage() { return *msg; };
+            
+            QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
+
+            virtual ~TxPublish(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+            QPID_BROKER_EXTERN uint64_t contentSize();
+
+            boost::intrusive_ptr<Message> getMessage() const { return msg; }
+            const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
+        };
+    }
 }
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/BrokerDefaults.cpp Fri Oct 21 01:19:00 2011
@@ -31,16 +31,10 @@ const std::string Broker::Options::DEFAU
 std::string
 Broker::Options::getHome() {
     std::string home;
-#ifdef _MSC_VER
     char home_c[MAX_PATH+1];
     size_t unused;
     if (0 == getenv_s (&unused, home_c, sizeof(home_c), "HOME"))
         home += home_c;
-#else
-    char *home_c = getenv("HOME");
-    if (home_c)
-        home += home_c;
-#endif
     return home;
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Fri Oct 21 01:19:00 2011
@@ -42,7 +42,7 @@ public:
     NullAuthenticator(Connection& connection);
     ~NullAuthenticator();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string* response);
+    void start(const std::string& mechanism, const std::string& response);
     void step(const std::string&) {}
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
 };
@@ -57,7 +57,7 @@ public:
     SspiAuthenticator(Connection& connection);
     ~SspiAuthenticator();
     void getMechanisms(framing::Array& mechanisms);
-    void start(const std::string& mechanism, const std::string* response);
+    void start(const std::string& mechanism, const std::string& response);
     void step(const std::string& response);
     std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
 };
@@ -93,15 +93,14 @@ NullAuthenticator::~NullAuthenticator() 
 void NullAuthenticator::getMechanisms(Array& mechanisms)
 {
     mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
-    mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));
 }
 
-void NullAuthenticator::start(const string& mechanism, const string* response)
+void NullAuthenticator::start(const string& mechanism, const string& response)
 {
     QPID_LOG(warning, "SASL: No Authentication Performed");
     if (mechanism == "PLAIN") { // Old behavior
-        if (response && response->size() > 0 && (*response).c_str()[0] == (char) 0) {
-            string temp = response->substr(1);
+        if (response.size() > 0 && response[0] == (char) 0) {
+            string temp = response.substr(1);
             string::size_type i = temp.find((char)0);
             string uid = temp.substr(0, i);
             string pwd = temp.substr(i + 1);
@@ -139,7 +138,7 @@ void SspiAuthenticator::getMechanisms(Ar
     QPID_LOG(info, "SASL: Mechanism list: ANONYMOUS PLAIN");
 }
 
-void SspiAuthenticator::start(const string& mechanism, const string* response)
+void SspiAuthenticator::start(const string& mechanism, const string& response)
 {
     QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
     if (mechanism == "ANONYMOUS") {
@@ -152,19 +151,16 @@ void SspiAuthenticator::start(const stri
 
     // PLAIN's response is composed of 3 strings separated by 0 bytes:
     // authorization id, authentication id (user), clear-text password.
-    if (!response || response->size() == 0)
+    if (response.size() == 0)
         throw ConnectionForcedException("Authentication failed");
 
-    string::size_type i = response->find((char)0);
-    string auth = response->substr(0, i);
-    string::size_type j = response->find((char)0, i+1);
-    string uid = response->substr(i+1, j-1);
-    string pwd = response->substr(j+1);
-    string dot(".");
+    string::size_type i = response.find((char)0);
+    string auth = response.substr(0, i);
+    string::size_type j = response.find((char)0, i+1);
+    string uid = response.substr(i+1, j-1);
+    string pwd = response.substr(j+1);
     int error = 0;
-    if (!LogonUser(const_cast<char*>(uid.c_str()),
-                   const_cast<char*>(dot.c_str()),
-                   const_cast<char*>(pwd.c_str()),
+    if (!LogonUser(uid.c_str(), ".", pwd.c_str(),
                    LOGON32_LOGON_NETWORK,
                    LOGON32_PROVIDER_DEFAULT,
                    &userToken))
@@ -180,7 +176,7 @@ void SspiAuthenticator::start(const stri
     client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
 }
         
-void SspiAuthenticator::step(const string& /*response*/)
+void SspiAuthenticator::step(const string& response)
 {
   QPID_LOG(info, "SASL: Need another step!!!");
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Oct 21 01:19:00 2011
@@ -27,14 +27,10 @@
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/windows/SslAsynchIO.h"
-
 #include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
 #include <memory>
-
 // security.h needs to see this to distinguish from kernel use.
 #define SECURITY_WIN32
 #include <security.h>
@@ -72,10 +68,9 @@ struct SslServerOptions : qpid::Options
 };
 
 class SslProtocolFactory : public qpid::sys::ProtocolFactory {
+    qpid::sys::Socket listener;
     const bool tcpNoDelay;
-    boost::ptr_vector<Socket> listeners;
-    boost::ptr_vector<AsynchAcceptor> acceptors;
-    uint16_t listeningPort;
+    const uint16_t listeningPort;
     std::string brokerHost;
     const bool clientAuthSelected;
     std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
@@ -83,14 +78,15 @@ class SslProtocolFactory : public qpid::
     CredHandle credHandle;
 
   public:
-    SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay);
+    SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
     ~SslProtocolFactory();
     void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
-    void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
+    void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port,
                  sys::ConnectionCodec::Factory*,
                  ConnectFailedCallback failed);
 
     uint16_t getPort() const;
+    std::string getHost() const;
     bool supports(const std::string& capability);
 
   private:
@@ -119,7 +115,6 @@ static struct SslPlugin : public Plugin 
             try {
                 const broker::Broker::Options& opts = broker->getOptions();
                 ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
-                                                                            "", boost::lexical_cast<std::string>(options.port),
                                                                             opts.connectionBacklog,
                                                                             opts.tcpNoDelay));
                 QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
@@ -132,13 +127,12 @@ static struct SslPlugin : public Plugin 
 } sslPlugin;
 
 SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
-                                       const std::string& host, const std::string& port, int backlog,
+                                       int backlog,
                                        bool nodelay)
     : tcpNoDelay(nodelay),
+      listeningPort(listener.listen(options.port, backlog)),
       clientAuthSelected(options.clientAuth) {
 
-    // Make sure that certificate store is good before listening to sockets
-    // to avoid having open and listening sockets when there is no cert store
     SecInvalidateHandle(&credHandle);
 
     // Get the certificate for this server.
@@ -183,23 +177,6 @@ SslProtocolFactory::SslProtocolFactory(c
         throw QPID_WINDOWS_ERROR(status);
     ::CertFreeCertificateContext(certContext);
     ::CertCloseStore(certStoreHandle, 0);
-
-    // Listen to socket(s)
-    SocketAddress sa(host, port);
-
-    // We must have at least one resolved address
-    QPID_LOG(info, "SSL Listening to: " << sa.asString())
-    Socket* s = new Socket;
-    listeningPort = s->listen(sa, backlog);
-    listeners.push_back(s);
-
-    // Try any other resolved addresses
-    while (sa.nextAddress()) {
-        QPID_LOG(info, "SSL Listening to: " << sa.asString())
-        Socket* s = new Socket;
-        s->listen(sa, backlog);
-        listeners.push_back(s);
-    }
 }
 
 SslProtocolFactory::~SslProtocolFactory() {
@@ -260,19 +237,21 @@ uint16_t SslProtocolFactory::getPort() c
     return listeningPort; // Immutable no need for lock.
 }
 
+std::string SslProtocolFactory::getHost() const {
+    return listener.getSockname();
+}
+
 void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
                                 sys::ConnectionCodec::Factory* fact) {
-    for (unsigned i = 0; i<listeners.size(); ++i) {
-        acceptors.push_back(
-            AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
-        acceptors[i].start(poller);
-    }
+    acceptor.reset(
+        AsynchAcceptor::create(listener,
+                               boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+    acceptor->start(poller);
 }
 
 void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
                                  const std::string& host,
-                                 const std::string& port,
+                                 int16_t port,
                                  sys::ConnectionCodec::Factory* fact,
                                  ConnectFailedCallback failed)
 {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionHandler.cpp Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@
 #include "qpid/client/ConnectionHandler.h"
 
 #include "qpid/SaslFactory.h"
-#include "qpid/StringUtils.h"
 #include "qpid/client/Bounds.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/framing/all_method_bodies.h"
@@ -143,9 +142,7 @@ void ConnectionHandler::outgoing(AMQFram
 void ConnectionHandler::waitForOpen()
 {
     waitFor(ESTABLISHED);
-    if (getState() == FAILED) {
-        throw TransportFailure(errorText);
-    } else if (getState() == CLOSED) {
+    if (getState() == FAILED || getState() == CLOSED) {
         throw ConnectionException(errorCode, errorText);
     }
 }
@@ -205,24 +202,6 @@ void ConnectionHandler::fail(const std::
 
 namespace {
 std::string SPACE(" ");
-
-std::string join(const std::vector<std::string>& in)
-{
-    std::string result;
-    for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
-        if (result.size()) result += SPACE;
-        result += *i;
-    }
-    return result;
-}
-
-void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results)
-{
-    for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) {
-        if (std::find(b.begin(), b.end(), *i) != b.end())  results.push_back(*i);
-    }
-}
-
 }
 
 void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
@@ -237,35 +216,26 @@ void ConnectionHandler::start(const Fiel
                                               maxSsf
                                             );
 
-    std::vector<std::string> mechlist;
-    if (mechanism.empty()) {
-        //mechlist is simply what the server offers
-        mechanisms.collect(mechlist);
-    } else {
-        //mechlist is the intersection of those indicated by user and
-        //those supported by server, in the order listed by user
-        std::vector<std::string> allowed = split(mechanism, " ");
-        std::vector<std::string> supported;
-        mechanisms.collect(supported);
-        intersection(allowed, supported, mechlist);
-        if (mechlist.empty()) {
-            throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")"));
+    std::string mechlist;
+    bool chosenMechanismSupported = mechanism.empty();
+    for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
+        if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
+            chosenMechanismSupported = true;
+            mechlist = (*i)->get<std::string>() + SPACE + mechlist;
+        } else {
+            if (i != mechanisms.begin()) mechlist += SPACE;
+            mechlist += (*i)->get<std::string>();
         }
     }
 
+    if (!chosenMechanismSupported) {
+        fail("Selected mechanism not supported: " + mechanism);
+    }
+
     if (sasl.get()) {
-        string response;
-        if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) {
-            proxy.startOk(properties, sasl->getMechanism(), response, locale);
-        } else {
-            //response was null
-            ConnectionStartOkBody body;
-            body.setClientProperties(properties);
-            body.setMechanism(sasl->getMechanism());
-            //Don't set response, as none was given
-            body.setLocale(locale);
-            proxy.send(body);
-        }
+        string response = sasl->start(mechanism.empty() ? mechlist : mechanism,
+                                      getSecuritySettings ? getSecuritySettings() : 0);
+        proxy.startOk(properties, sasl->getMechanism(), response, locale);
     } else {
         //TODO: verify that desired mechanism and locale are supported
         string response = ((char)0) + username + ((char)0) + password;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -36,7 +36,6 @@
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
 #include <boost/shared_ptr.hpp>
 
 #include <limits>
@@ -259,16 +258,16 @@ void ConnectionImpl::open()
     connector->setInputHandler(&handler);
     connector->setShutdownHandler(this);
     try {
-        std::string p = boost::lexical_cast<std::string>(port);
-        connector->connect(host, p);
-
+        connector->connect(host, port);
+    
     } catch (const std::exception& e) {
         QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
         connector.reset();
-        throw TransportFailure(e.what());
+        throw;
     }
     connector->init();
-
+    QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
+ 
     // Enable heartbeat if requested
     uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
     if (heartbeat) {
@@ -282,7 +281,6 @@ void ConnectionImpl::open()
     // - in that case in connector.reset() above;
     // - or when we are deleted
     handler.waitForOpen();
-    QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
 
     // If the SASL layer has provided an "operational" userId for the connection,
     // put it in the negotiated settings.

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/Connector.h Fri Oct 21 01:19:00 2011
@@ -61,7 +61,7 @@ class Connector : public framing::Output
     static void registerFactory(const std::string& proto, Factory* connectorFactory);
 
     virtual ~Connector() {};
-    virtual void connect(const std::string& host, const std::string& port) = 0;
+    virtual void connect(const std::string& host, int port) = 0;
     virtual void init() {};
     virtual void close() = 0;
     virtual void send(framing::AMQFrame& frame) = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/RdmaConnector.cpp Fri Oct 21 01:19:00 2011
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector, 
 
     std::string identifier;
 
-    void connect(const std::string& host, const std::string& port);
+    void connect(const std::string& host, int port);
     void close();
     void send(framing::AMQFrame& frame);
     void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
     }
 }
 
-void RdmaConnector::connect(const std::string& host, const std::string& port){
+void RdmaConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(dataConnectedLock);
     assert(!dataConnected);
 
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::s
         boost::bind(&RdmaConnector::disconnected, this),
         boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
 
-    SocketAddress sa(host, port);
+    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
     acon->start(poller, sa);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/SessionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -170,7 +170,6 @@ Demux& SessionImpl::getDemux()
 void SessionImpl::waitForCompletion(const SequenceNumber& id)
 {
     Lock l(state);
-    sys::Waitable::ScopedWait w(state);
     waitForCompletionImpl(id);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/SslConnector.cpp Fri Oct 21 01:19:00 2011
@@ -114,7 +114,7 @@ class SslConnector : public Connector
 
     std::string identifier;
 
-    void connect(const std::string& host, const std::string& port);
+    void connect(const std::string& host, int port);
     void init();
     void close();
     void send(framing::AMQFrame& frame);
@@ -190,14 +190,14 @@ SslConnector::~SslConnector() {
     close();
 }
 
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     try {
         socket.connect(host, port);
     } catch (const std::exception& e) {
         socket.close();
-        throw TransportFailure(e.what());
+        throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what());
     }
 
     identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.cpp Fri Oct 21 01:19:00 2011
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
     close();
 }
 
-void TCPConnector::connect(const std::string& host, const std::string& port) {
+void TCPConnector::connect(const std::string& host, int port) {
     Mutex::ScopedLock l(lock);
     assert(closed);
     connector = AsynchConnector::create(
@@ -117,11 +117,11 @@ void TCPConnector::connected(const Socke
 
 void TCPConnector::start(sys::AsynchIO* aio_) {
     aio = aio_;
-    for (int i = 0; i < 4; i++) {
+    for (int i = 0; i < 32; i++) {
         aio->queueReadBuffer(new Buff(maxFrameSize));
     }
 
-    identifier = str(format("[%1%]") % socket.getFullAddress());
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
 }
 
 void TCPConnector::initAmqp() {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/TCPConnector.h Fri Oct 21 01:19:00 2011
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, p
 
 protected:
     virtual ~TCPConnector();
-    void connect(const std::string& host, const std::string& port);
+    void connect(const std::string& host, int port);
     void start(sys::AsynchIO* aio_);
     void initAmqp();
     virtual void connectFailed(const std::string& msg);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Fri Oct 21 01:19:00 2011
@@ -30,23 +30,12 @@ void AcceptTracker::State::accept()
     unaccepted.clear();
 }
 
-SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
 {
-    SequenceSet accepting;
-    if (cumulative) {
-        for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
-            accepting.add(*i);
-        }
-        unconfirmed.add(accepting);
-        unaccepted.remove(accepting);
-    } else {
-        if (unaccepted.contains(id)) {
-            unaccepted.remove(id);
-            unconfirmed.add(id);
-            accepting.add(id);
-        }
+    if (unaccepted.contains(id)) {
+        unaccepted.remove(id);
+        unconfirmed.add(id);
     }
-    return accepting;
 }
 
 void AcceptTracker::State::release()
@@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std:
     destinationState[destination].unaccepted.add(id);
 }
 
-namespace
-{
-const size_t FLUSH_FREQUENCY = 1024;
-}
-
-void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
-{
-    pending.push_back(record);
-    if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
-}
-
-
 void AcceptTracker::accept(qpid::client::AsyncSession& session)
 {
     for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
@@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client:
     Record record;
     record.status = session.messageAccept(aggregateState.unaccepted);
     record.accepted = aggregateState.unaccepted;
-    addToPending(session, record);
+    pending.push_back(record);
     aggregateState.accept();
 }
 
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
 {
     for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
-        i->second.accept(id, cumulative);
+        i->second.accept(id);
     }
     Record record;
-    record.accepted = aggregateState.accept(id, cumulative);
+    record.accepted.add(id);
     record.status = session.messageAccept(record.accepted);
-    addToPending(session, record);
+    pending.push_back(record);
+    aggregateState.accept(id);
 }
 
 void AcceptTracker::release(qpid::client::AsyncSession& session)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Fri Oct 21 01:19:00 2011
@@ -42,7 +42,7 @@ class AcceptTracker
   public:
     void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
     void accept(qpid::client::AsyncSession&);
-    void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
+    void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
     void release(qpid::client::AsyncSession&);
     uint32_t acceptsPending();
     uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
         qpid::framing::SequenceSet unconfirmed;
 
         void accept();
-        qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
+        void accept(qpid::framing::SequenceNumber);
         void release();
         uint32_t acceptsPending();
         void completed(qpid::framing::SequenceSet&);
@@ -79,7 +79,6 @@ class AcceptTracker
     StateMap destinationState;
     Records pending;
 
-    void addToPending(qpid::client::AsyncSession&, const Record&);
     void checkPending();
     void completed(qpid::framing::SequenceSet&);
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Fri Oct 21 01:19:00 2011
@@ -129,10 +129,6 @@ const std::string HEADERS_EXCHANGE("head
 const std::string XML_EXCHANGE("xml");
 const std::string WILDCARD_ANY("#");
 
-//exchange prefixes:
-const std::string PREFIX_AMQ("amq.");
-const std::string PREFIX_QPID("qpid.");
-
 const Verifier verifier;
 }
 
@@ -203,7 +199,6 @@ class Exchange : protected Node
     void checkCreate(qpid::client::AsyncSession&, CheckMode);
     void checkAssert(qpid::client::AsyncSession&, CheckMode);
     void checkDelete(qpid::client::AsyncSession&, CheckMode);
-    bool isReservedName();
 
   protected:
     const std::string specifiedType;
@@ -238,8 +233,6 @@ class Subscription : public Exchange, pu
     const bool reliable;
     const bool durable;
     const std::string actualType;
-    const bool exclusiveQueue;
-    const bool exclusiveSubscription;
     FieldTable queueOptions;
     FieldTable subscriptionOptions;
     Bindings bindings;
@@ -314,7 +307,6 @@ struct Opt
     Opt& operator/(const std::string& name);
     operator bool() const;
     std::string str() const;
-    bool asBool(bool defaultValue) const;
     const Variant::List& asList() const;
     void collect(qpid::framing::FieldTable& args) const;
 
@@ -346,12 +338,6 @@ Opt::operator bool() const
     return value && !value->isVoid() && value->asBool();
 }
 
-bool Opt::asBool(bool defaultValue) const
-{
-    if (value) return value->asBool();
-    else return defaultValue;
-}
-
 std::string Opt::str() const
 {
     if (value) return value->asString();
@@ -495,7 +481,7 @@ std::string Subscription::getSubscriptio
     if (name.empty()) {
         return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
     } else {
-        return name;
+        return (boost::format("%1%_%2%") % base % name).str();
     }
 }
 
@@ -504,9 +490,7 @@ Subscription::Subscription(const Address
       queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
       reliable(AddressResolution::is_reliable(address)),
       durable(Opt(address)/LINK/DURABLE),
-      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
-      exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
-      exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
 {
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -566,7 +550,7 @@ void Subscription::subscribe(qpid::clien
     checkAssert(session, FOR_RECEIVER);
 
     //create subscription queue:
-    session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
+    session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
                          arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
     //'default' binding:
     bindings.bind(session);
@@ -575,15 +559,15 @@ void Subscription::subscribe(qpid::clien
     linkBindings.bind(session);
     //subscribe to subscription queue:
     AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
-    session.messageSubscribe(arg::queue=queue, arg::destination=destination,
-                             arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+    session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
+                             arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
 }
 
 void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
 {
     linkBindings.unbind(session);
     session.messageCancel(destination);
-    if (exclusiveQueue) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
+    session.queueDelete(arg::queue=queue);
     checkDelete(session, FOR_RECEIVER);
 }
 
@@ -777,32 +761,18 @@ Exchange::Exchange(const Address& a) : N
     linkBindings.setDefaultExchange(name);
 }
 
-bool Exchange::isReservedName()
-{
-    return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
-}
-
 void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
 {
     if (enabled(createPolicy, mode)) {
         try {
-            if (isReservedName()) {
-                try {
-                    sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
-                } catch (const qpid::framing::NotFoundException& /*e*/) {
-                    throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
-                }
-
-            } else {
-                std::string type = specifiedType;
-                if (type.empty()) type = TOPIC_EXCHANGE;
-                session.exchangeDeclare(arg::exchange=name,
-                                        arg::type=type,
-                                        arg::durable=durable,
-                                        arg::autoDelete=autoDelete,
-                                        arg::alternateExchange=alternateExchange,
-                                        arg::arguments=arguments);
-            }
+            std::string type = specifiedType;
+            if (type.empty()) type = TOPIC_EXCHANGE;
+            session.exchangeDeclare(arg::exchange=name,
+                                          arg::type=type,
+                                          arg::durable=durable,
+                                          arg::autoDelete=autoDelete,
+                                          arg::alternateExchange=alternateExchange,
+                                          arg::arguments=arguments);
             nodeBindings.bind(session);
             session.sync();
         } catch (const qpid::framing::NotAllowedException& e) {
@@ -852,7 +822,7 @@ void Exchange::checkAssert(qpid::client:
                 FieldTable::ValuePtr v = result.getArguments().get(i->first);
                 if (!v) {
                     throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
-                } else if (*i->second != *v) {
+                } else if (i->second != v) {
                     throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
                                           % i->first % name % *(i->second) % *v).str());
                 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -20,6 +20,7 @@
  */
 #include "ConnectionImpl.h"
 #include "SessionImpl.h"
+#include "SimpleUrlParser.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/PrivateImplRef.h"
@@ -38,18 +39,26 @@ using qpid::types::Variant;
 using qpid::types::VAR_LIST;
 using qpid::framing::Uuid;
 
-namespace {
-void merge(const std::string& value, std::vector<std::string>& list) {
-    if (std::find(list.begin(), list.end(), value) == list.end())
-        list.push_back(value);
+void convert(const Variant::List& from, std::vector<std::string>& to)
+{
+    for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
+        to.push_back(i->asString());
+    }
 }
 
-void merge(const Variant::List& from, std::vector<std::string>& to)
+template <class T> bool setIfFound(const Variant::Map& map, const std::string& key, T& value)
 {
-    for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
-        merge(i->asString(), to);
+    Variant::Map::const_iterator i = map.find(key);
+    if (i != map.end()) {
+        value = (T) i->second;
+        QPID_LOG(debug, "option " << key << " specified as " << i->second);
+        return true;
+    } else {
+        return false;
+    }
 }
 
+namespace {
 std::string asString(const std::vector<std::string>& v) {
     std::stringstream os;
     os << "[";
@@ -62,8 +71,49 @@ std::string asString(const std::vector<s
 }
 }
 
+template <> bool setIfFound< std::vector<std::string> >(const Variant::Map& map,
+                                            const std::string& key,
+                                            std::vector<std::string>& value)
+{
+    Variant::Map::const_iterator i = map.find(key);
+    if (i != map.end()) {
+        value.clear();
+        if (i->second.getType() == VAR_LIST) {
+            convert(i->second.asList(), value);
+        } else {
+            value.push_back(i->second.asString());
+        }
+        QPID_LOG(debug, "option " << key << " specified as " << asString(value));
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void convert(const Variant::Map& from, ConnectionSettings& to)
+{
+    setIfFound(from, "username", to.username);
+    setIfFound(from, "password", to.password);
+    setIfFound(from, "sasl-mechanism", to.mechanism);
+    setIfFound(from, "sasl-service", to.service);
+    setIfFound(from, "sasl-min-ssf", to.minSsf);
+    setIfFound(from, "sasl-max-ssf", to.maxSsf);
+
+    setIfFound(from, "heartbeat", to.heartbeat);
+    setIfFound(from, "tcp-nodelay", to.tcpNoDelay);
+
+    setIfFound(from, "locale", to.locale);
+    setIfFound(from, "max-channels", to.maxChannels);
+    setIfFound(from, "max-frame-size", to.maxFrameSize);
+    setIfFound(from, "bounds", to.bounds);
+
+    setIfFound(from, "transport", to.protocol);
+
+    setIfFound(from, "ssl-cert-name", to.sslCertName);
+}
+
 ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
-    replaceUrls(false), reconnect(false), timeout(-1), limit(-1),
+    reconnect(false), timeout(-1), limit(-1),
     minReconnectInterval(3), maxReconnectInterval(60),
     retries(0), reconnectOnLimitExceeded(true)
 {
@@ -74,69 +124,27 @@ ConnectionImpl::ConnectionImpl(const std
 
 void ConnectionImpl::setOptions(const Variant::Map& options)
 {
-    for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
-        setOption(i->first, i->second);
+    sys::Mutex::ScopedLock l(lock);
+    convert(options, settings);
+    setIfFound(options, "reconnect", reconnect);
+    setIfFound(options, "reconnect-timeout", timeout);
+    setIfFound(options, "reconnect-limit", limit);
+    int64_t reconnectInterval;
+    if (setIfFound(options, "reconnect-interval", reconnectInterval)) {
+        minReconnectInterval = maxReconnectInterval = reconnectInterval;
+    } else {
+        setIfFound(options, "reconnect-interval-min", minReconnectInterval);
+        setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
     }
+    setIfFound(options, "reconnect-urls", urls);
+    setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
 }
 
 void ConnectionImpl::setOption(const std::string& name, const Variant& value)
 {
-    sys::Mutex::ScopedLock l(lock);
-    if (name == "reconnect") {
-        reconnect = value;
-    } else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
-        timeout = value;
-    } else if (name == "reconnect-limit" || name == "reconnect_limit") {
-        limit = value;
-    } else if (name == "reconnect-interval" || name == "reconnect_interval") {
-        maxReconnectInterval = minReconnectInterval = value;
-    } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") {
-        minReconnectInterval = value;
-    } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") {
-        maxReconnectInterval = value;
-    } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") {
-        replaceUrls = value.asBool();
-    } else if (name == "reconnect-urls" || name == "reconnect_urls") {
-        if (replaceUrls) urls.clear();
-        if (value.getType() == VAR_LIST) {
-            merge(value.asList(), urls);
-        } else {
-            merge(value.asString(), urls);
-        }
-    } else if (name == "username") {
-        settings.username = value.asString();
-    } else if (name == "password") {
-        settings.password = value.asString();
-    } else if (name == "sasl-mechanism" || name == "sasl_mechanism" ||
-               name == "sasl-mechanisms" || name == "sasl_mechanisms") {
-        settings.mechanism = value.asString();
-    } else if (name == "sasl-service" || name == "sasl_service") {
-        settings.service = value.asString();
-    } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") {
-        settings.minSsf = value;
-    } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") {
-        settings.maxSsf = value;
-    } else if (name == "heartbeat") {
-        settings.heartbeat = value;
-    } else if (name == "tcp-nodelay" || name == "tcp_nodelay") {
-        settings.tcpNoDelay = value;
-    } else if (name == "locale") {
-        settings.locale = value.asString();
-    } else if (name == "max-channels" || name == "max_channels") {
-        settings.maxChannels = value;
-    } else if (name == "max-frame-size" || name == "max_frame_size") {
-        settings.maxFrameSize = value;
-    } else if (name == "bounds") {
-        settings.bounds = value;
-    } else if (name == "transport") {
-        settings.protocol = value.asString();
-    } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
-        settings.sslCertName = value.asString();
-    } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
-        reconnectOnLimitExceeded = value;
-    } else {
-        throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
-    }
+    Variant::Map options;
+    options[name] = value;
+    setOptions(options);
 }
 
 
@@ -206,7 +214,7 @@ qpid::messaging::Session ConnectionImpl:
             sessions[name] = impl;
             break;
         } catch (const qpid::TransportFailure&) {
-            reopen();
+            open();
         } catch (const qpid::SessionException& e) {
             throw qpid::messaging::SessionError(e.what());
         } catch (const std::exception& e) {
@@ -227,15 +235,6 @@ void ConnectionImpl::open()
     catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
 }
 
-void ConnectionImpl::reopen()
-{
-    if (!reconnect) {
-        throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
-    }
-    open();
-}
-
-
 bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
 {
     if (timeout == 0) return true;
@@ -263,9 +262,14 @@ void ConnectionImpl::connect(const qpid:
 }
 
 void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
-    for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
-        merge(i->str(), urls);
-    QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+    if (more.size()) {
+        for (size_t i = 0; i < more.size(); ++i) {
+            if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
+                urls.push_back(more[i].str());
+            }
+        }
+        QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
+    }
 }
 
 bool ConnectionImpl::tryConnect()
@@ -274,14 +278,21 @@ bool ConnectionImpl::tryConnect()
     for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
         try {
             QPID_LOG(info, "Trying to connect to " << *i << "...");
-            Url url(*i);
-            if (url.getUser().size()) settings.username = url.getUser();
-            if (url.getPass().size()) settings.password = url.getPass();
-            connection.open(url, settings);
+            //TODO: when url support is more complete can avoid this test here
+            if (i->find("amqp:") == 0) {
+                Url url(*i);
+                connection.open(url, settings);
+            } else {
+                SimpleUrlParser::parse(*i, settings);
+                connection.open(settings);
+            }
             QPID_LOG(info, "Connected to " << *i);
             mergeUrls(connection.getInitialBrokers(), l);
             return resetSessions(l);
-        } catch (const qpid::TransportFailure& e) {
+        } catch (const qpid::ConnectionException& e) {
+            //TODO: need to fix timeout on
+            //qpid::client::Connection::open() so that it throws
+            //TransportFailure rather than a ConnectionException
             QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
         }
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Fri Oct 21 01:19:00 2011
@@ -43,7 +43,6 @@ class ConnectionImpl : public qpid::mess
   public:
     ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
     void open();
-    void reopen();
     bool isOpen() const;
     void close();
     qpid::messaging::Session newSession(bool transactional, const std::string& name);
@@ -60,7 +59,6 @@ class ConnectionImpl : public qpid::mess
     qpid::sys::Semaphore semaphore;//used to coordinate reconnection
     Sessions sessions;
     qpid::client::Connection connection;
-    bool replaceUrls;     // Replace rather than merging with reconnect-urls
     std::vector<std::string> urls;
     qpid::client::ConnectionSettings settings;
     bool reconnect;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Fri Oct 21 01:19:00 2011
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
     acceptTracker.accept(session);
 }
 
-void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id)
 {
     sys::Mutex::ScopedLock l(lock);
-    acceptTracker.accept(id, session, cumulative);
+    acceptTracker.accept(id, session);
 }
 
 
@@ -301,7 +301,6 @@ const std::string SUBJECT("qpid.subject"
 const std::string X_APP_ID("x-amqp-0-10.app-id");
 const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
 const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
-const std::string X_TIMESTAMP("x-amqp-0-10.timestamp");
 }
 
 void populateHeaders(qpid::messaging::Message& message, 
@@ -335,13 +334,10 @@ void populateHeaders(qpid::messaging::Me
         if (messageProperties->hasContentEncoding()) {
             message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
         }
-        //    routing-key, timestamp, others?
+        //    routing-key, others?
         if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
             message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
         }
-        if (deliveryProperties && deliveryProperties->hasTimestamp()) {
-            message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp();
-        }
     }
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Fri Oct 21 01:19:00 2011
@@ -72,7 +72,7 @@ class IncomingMessages
     bool get(Handler& handler, qpid::sys::Duration timeout);
     bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
     void accept();
-    void accept(qpid::framing::SequenceNumber id, bool cumulative);
+    void accept(qpid::framing::SequenceNumber id);
     void releaseAll();
     void releasePending(const std::string& destination);
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Fri Oct 21 01:19:00 2011
@@ -59,9 +59,7 @@ void OutgoingMessage::convert(const qpid
         message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
     }
     translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
-    if (from.getTtl().getMilliseconds()) {
-        message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
-    }
+    message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
     if (from.getDurable()) {
         message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT);
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Fri Oct 21 01:19:00 2011
@@ -135,7 +135,6 @@ void SenderImpl::sendUnreliable(const qp
 void SenderImpl::replay(const sys::Mutex::ScopedLock&)
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
-        i->message.setRedelivered(true);
         sink->send(session, name, *i);
     }
 }
@@ -148,7 +147,7 @@ uint32_t SenderImpl::checkPendingSends(b
 uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
 {
     if (flush) {
-        session.flush();
+        session.flush(); 
         flushed = true;
     } else {
         flushed = false;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Oct 21 01:19:00 2011
@@ -60,14 +60,12 @@ SessionImpl::SessionImpl(ConnectionImpl&
 
 void SessionImpl::checkError()
 {
-    ScopedLock l(lock);
     qpid::client::SessionBase_0_10Access s(session);
     s.get()->assertOpen();
 }
 
 bool SessionImpl::hasError()
 {
-    ScopedLock l(lock);
     qpid::client::SessionBase_0_10Access s(session);
     return s.get()->hasError();
 }
@@ -114,14 +112,13 @@ void SessionImpl::release(qpid::messagin
     execute1<Release>(m);
 }
 
-void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledge(qpid::messaging::Message& m)
 {
     //Should probably throw an exception on failure here, or indicate
     //it through a return type at least. Failure means that the
     //message may be redelivered; i.e. the application cannot delete
     //any state necessary for preventing reprocessing of the message
-    Acknowledge2 ack(*this, m, cumulative);
-    execute(ack);
+    execute1<Acknowledge1>(m);
 }
 
 void SessionImpl::close()
@@ -131,29 +128,27 @@ void SessionImpl::close()
         senders.clear();
         receivers.clear();
     } else {
-        Senders sCopy;
-        Receivers rCopy;
-        {
-            ScopedLock l(lock);
-            senders.swap(sCopy);
-            receivers.swap(rCopy);
-        }
-        for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
-        {
-            // outside the lock, will call senderCancelled
-            i->second.close();
+        while (true) {
+            Sender s;
+            {
+                ScopedLock l(lock);
+                if (senders.empty()) break;
+                s = senders.begin()->second;
+            }
+            s.close();  // outside the lock, will call senderCancelled
         }
-        for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
-        {
-            // outside the lock, will call receiverCancelled
-            i->second.close();
+        while (true) {
+            Receiver r;
+            {
+                ScopedLock l(lock);
+                if (receivers.empty()) break;
+                r = receivers.begin()->second;
+            }
+            r.close();  // outside the lock, will call receiverCancelled
         }
     }
     connection->closed(*this);
-    if (!hasError()) {
-        ScopedLock l(lock);
-        session.close();
-    }
+    if (!hasError()) session.close();
 }
 
 template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
@@ -436,11 +431,8 @@ uint32_t SessionImpl::getUnsettledAcksIm
 
 void SessionImpl::syncImpl(bool block)
 {
-    {
-        ScopedLock l(lock);
-        if (block) session.sync();
-        else session.flush();
-    }
+    if (block) session.sync();
+    else session.flush();
     //cleanup unconfirmed accept records:
     incoming.pendingAccept();
 }
@@ -475,10 +467,10 @@ void SessionImpl::acknowledgeImpl()
     if (!transactional) incoming.accept();
 }
 
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
 {
     ScopedLock l(lock);
-    if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
+    if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
 }
 
 void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -517,7 +509,7 @@ void SessionImpl::senderCancelled(const 
 
 void SessionImpl::reconnect()
 {
-    connection->reopen();
+    connection->open();
 }
 
 bool SessionImpl::backoff()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/amqp0_10/SessionImpl.h Fri Oct 21 01:19:00 2011
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messagi
     void acknowledge(bool sync);
     void reject(qpid::messaging::Message&);
     void release(qpid::messaging::Message&);
-    void acknowledge(qpid::messaging::Message& msg, bool cumulative);
+    void acknowledge(qpid::messaging::Message& msg);
     void close();
     void sync(bool block);
     qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messagi
     void commitImpl();
     void rollbackImpl();
     void acknowledgeImpl();
-    void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
+    void acknowledgeImpl(qpid::messaging::Message&);
     void rejectImpl(qpid::messaging::Message&);
     void releaseImpl(qpid::messaging::Message&);
     void closeImpl();
@@ -204,13 +204,12 @@ class SessionImpl : public qpid::messagi
         void operator()() { impl.releaseImpl(message); }
     };
 
-    struct Acknowledge2 : Command
+    struct Acknowledge1 : Command
     {
         qpid::messaging::Message& message;
-        bool cumulative;
 
-        Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
-        void operator()() { impl.acknowledgeImpl(message, cumulative); }
+        Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+        void operator()() { impl.acknowledgeImpl(message); }
     };
 
     struct CreateSender;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SaslFactory.cpp Fri Oct 21 01:19:00 2011
@@ -71,7 +71,7 @@ class WindowsSasl : public Sasl
   public:
     WindowsSasl( const std::string &, const std::string &, const std::string &, const std::string &, int, int );
     ~WindowsSasl();
-    bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
+    std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
     std::string step(const std::string& challenge);
     std::string getMechanism();
     std::string getUserId();
@@ -121,8 +121,8 @@ WindowsSasl::~WindowsSasl() 
 {
 }
 
-bool WindowsSasl::start(const std::string& mechanisms, std::string& response,
-                        const SecuritySettings* /*externalSettings*/)
+std::string WindowsSasl::start(const std::string& mechanisms,
+                               const SecuritySettings* /*externalSettings*/)
 {
     QPID_LOG(debug, "WindowsSasl::start(" << mechanisms << ")");
 
@@ -142,18 +142,18 @@ bool WindowsSasl::start(const std::strin
     if (!haveAnon && !havePlain)
         throw InternalErrorException(QPID_MSG("Sasl error: no common mechanism"));
 
+    std::string resp = "";
     if (havePlain) {
         mechanism = PLAIN;
-        response = ((char)0) + settings.username + ((char)0) + settings.password;
+        resp = ((char)0) + settings.username + ((char)0) + settings.password;
     }
     else {
         mechanism = ANONYMOUS;
-        response = "";
     }
-    return true;
+    return resp;
 }
 
-std::string WindowsSasl::step(const std::string& /*challenge*/)
+std::string WindowsSasl::step(const std::string& challenge)
 {
     // Shouldn't get this for PLAIN...
     throw InternalErrorException(QPID_MSG("Sasl step error"));
@@ -169,7 +169,7 @@ std::string WindowsSasl::getUserId()
     return std::string(); // TODO - when GSSAPI is supported, return userId for connection.
 }
 
-std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t /*maxFrameSize*/)
+std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize)
 {
     return std::auto_ptr<SecurityLayer>(0);
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/client/windows/SslConnector.cpp Fri Oct 21 01:19:00 2011
@@ -77,7 +77,7 @@ public:
                  framing::ProtocolVersion pVersion,
                  const ConnectionSettings&, 
                  ConnectionImpl*);
-    virtual void connect(const std::string& host, const std::string& port);
+    virtual void connect(const std::string& host, int port);
     virtual void connected(const Socket&);
     unsigned int getSSF();
 };
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
 
   // Will this get reach via virtual method via boost::bind????
 
-void SslConnector::connect(const std::string& host, const std::string& port) {
+void SslConnector::connect(const std::string& host, int port) {
     brokerHost = host;
     TCPConnector::connect(host, port);
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.cpp Fri Oct 21 01:19:00 2011
@@ -36,45 +36,45 @@
  *
  * IMPORTANT NOTE: any time code is added to the broker that uses timers,
  * the cluster may need to be updated to take account of this.
- *
+ * 
  *
  * USE OF TIMESTAMPS IN THE BROKER
- *
+ *  
  * The following are the current areas where broker uses timers or timestamps:
- *
+ * 
  * - Producer flow control: broker::SemanticState uses
  *   connection::getClusterOrderOutput.  a FrameHandler that sends
  *   frames to the client via the cluster. Used by broker::SessionState
- *
+ *   
  * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
  *   implemented by cluster::ExpiryPolicy.
- *
+ * 
  * - Connection heartbeat: sends connection controls, not part of
  *   session command counting so OK to ignore.
- *
+ * 
  * - LinkRegistry: only cluster elder is ever active for links.
- *
+ * 
  * - management::ManagementBroker: uses MessageHandler supplied by  cluster
  *   to send messages to the broker via the cluster.
+ *   
+ * - Dtx: not yet supported with cluster.  
  *
- * cluster::ExpiryPolicy uses cluster time.
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
  *
  * ClusterTimer implements periodic timed events in the cluster context.
- * Used for:
- * - periodic management events.
- * - DTX transaction timeouts.
+ * Used for periodic management events.
  *
  * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ * 
  * Messages sent to/from CPG are called Events.
  *
  * An Event carries a ConnectionId, which includes a MemberId and a
  * connection number.
- *
+ * 
  * Events are either
  *  - Connection events: non-0 connection number and are associated with a connection.
  *  - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ * 
  * Events are further categorized as:
  *  - Control: carries method frame(s) that affect cluster behavior.
  *  - Data: carries raw data received from a client connection.
@@ -146,7 +146,6 @@
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterClockBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionAbortBody.h"
 #include "qpid/framing/ClusterRetractOfferBody.h"
@@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -215,7 +214,7 @@ struct ClusterDispatcher : public framin
     {
         cluster.initialStatus(
             member, version, active, clusterId,
-            framing::cluster::StoreState(storeState), shutdownId,
+            framing::cluster::StoreState(storeState), shutdownId, 
             firstConfig, l);
     }
     void ready(const std::string& url) {
@@ -231,21 +230,21 @@ struct ClusterDispatcher : public framin
         cluster.updateOffer(member, updatee, l);
     }
     void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
+    void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
     void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
         cluster.errorCheck(member, type, frameSeq, l);
     }
     void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
-    void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
+    void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
     void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
     void deliverToQueue(const std::string& queue, const std::string& message) {
         cluster.deliverToQueue(queue, message, l);
     }
-    void clock(uint64_t time) { cluster.clock(time, l); }
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
 Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
-    settings(set),
+    settings(set), 
     broker(b),
     mgmtObject(0),
     poller(b.getPoller()),
@@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings& 
     self(cpg.self()),
     clusterId(true),
     mAgent(0),
-    expiryPolicy(new ExpiryPolicy(*this)),
+    expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings& 
     lastBroker(false),
     updateRetracted(false),
     updateClosed(false),
-    error(*this),
-    acl(0)
+    error(*this)
 {
-    broker.setInCluster(true);
-
     // We give ownership of the timer to the broker and keep a plain pointer.
     // This is OK as it means the timer has the same lifetime as the broker.
     timer = new ClusterTimer(*this);
@@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings& 
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        clusterId = store.getClusterId();
+        clusterId = store.getClusterId(); 
         QPID_LOG(notice, "Cluster store state: " << store)
             }
     cpg.join(name);
@@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const 
     // Safe to use connections here because we're pre-catchup, stalled
     // and discarding, so deliveredFrame is not processing any
     // connection events.
-    assert(discarding);
+    assert(discarding);         
     pair<ConnectionMap::iterator, bool> ib
         = connections.insert(ConnectionMap::value_type(c->getId(), c));
-    // Like this to avoid tripping up unused variable warning when NDEBUG set
-    if (!ib.second) assert(ib.second);
+    assert(ib.second);
 }
 
 void Cluster::erase(const ConnectionId& id) {
-    Lock l(lock);
+    Lock l(lock);    
     erase(id,l);
 }
 
@@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() cons
 
 std::vector<Url> Cluster::getUrls(Lock&) const {
     return map.memberUrls();
-}
+} 
 
-void Cluster::leave() {
+void Cluster::leave() { 
     Lock l(lock);
     leave(l);
 }
@@ -410,7 +405,7 @@ void Cluster::leave() {
         QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
     } do {} while(0)
 
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -429,7 +424,7 @@ void Cluster::deliver(
     uint32_t nodeid,
     uint32_t pid,
     void* msg,
-    int msg_len)
+    int msg_len) 
 {
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event
         EventFrame ef(e, e.getFrame());
         // Stop the deliverEventQueue on update offers.
         // This preserves the connection decoder fragments for an update.
-        // Only do this for the two brokers that are directly involved in this
+        // Only do this for the two brokers that are directly involved in this 
         // offer: the one making the offer, or the one receiving it.
         const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
         if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event
         }
         deliverFrame(ef);
     }
-    else if(!discarding) {
+    else if(!discarding) { 
         if (e.isControl())
             deliverFrame(EventFrame(e, e.getFrame()));
         else {
@@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const Event
         // the event queue.
         e.frame = AMQFrame(
             ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
-        deliverEventQueue.start();
+        deliverEventQueue.start(); 
     }
     // Process each frame through the error checker.
     if (error.isUnresolved()) {
@@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const Event
         while (error.canProcess())  // There is a frame ready to process.
             processFrame(error.getNext(), l);
     }
-    else
+    else 
         processFrame(e, l);
 }
 
 
 void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
-        QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
+        QPID_LOG(trace, *this << " DLVR: " << e);
         ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
         if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
             throw Exception(QPID_MSG("Invalid cluster control"));
@@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFr
         map.incrementFrameSeq();
         ConnectionPtr connection = getConnection(e, l);
         if (connection) {
-            QPID_LOG_IF(trace, loggable(e.frame),
-                        *this << " DLVR " << map.getFrameSeq() << ":  " << e);
+            QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ":  " << e);
             connection->deliveredFrame(e);
         }
         else
-            throw Exception(QPID_MSG("Unknown connection: " << e));
+            QPID_LOG(trace, *this << " DROP (no connection): " << e);
     }
     else // Drop connection frames while state < CATCHUP
-        QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
+        QPID_LOG(trace, *this << " DROP (joining): " << e);
 }
 
 // Called in deliverFrameQueue thread
@@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getCo
 }
 
 // CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
     const cpg_name */*group*/,
     const cpg_address *members, int nMembers,
@@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) {
 }
 
 // Set the management status from the Cluster::state.
-//
+// 
 // NOTE: Management updates are sent based on property changes.  In
 // order to keep consistency across the cluster, we touch the local
 // management status property even if it is locally unchanged for any
@@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) {
 }
 
 void Cluster::initMapCompleted(Lock& l) {
-    // Called on completion of the initial status map.
+    // Called on completion of the initial status map.    
     QPID_LOG(debug, *this << " initial status map complete. ");
     setMgmtStatus(l);
     if (state == PRE_INIT) {
@@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l) 
         else {                  // I can go ready.
             discarding = false;
             setReady(l);
-            // Must be called *before* memberUpdate so first update will be generated.
-            failoverExchange->setReady();
             memberUpdate(l);
             updateMgmtMembership(l);
             mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -709,8 +701,8 @@ void Cluster::configChange(const MemberI
     if (initMap.isResendNeeded()) {
         mcast.mcastControl(
             ClusterInitialStatusBody(
-                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
-                store.getState(), store.getShutdownId(),
+                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
+                store.getState(), store.getShutdownId(), 
                 initMap.getFirstConfigStr()
             ),
             self);
@@ -725,20 +717,6 @@ void Cluster::configChange(const MemberI
     updateMgmtMembership(l);     // Update on every config change for consistency
 }
 
-struct ClusterClockTask : public sys::TimerTask {
-    Cluster& cluster;
-    sys::Timer& timer;
-
-    ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
-      : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
-
-    void fire() {
-      cluster.sendClockUpdate();
-      setupNextFire();
-      timer.add(this);
-    }
-};
-
 void Cluster::becomeElder(Lock&) {
     if (elder) return;          // We were already the elder.
     // We are the oldest, reactive links if necessary
@@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) {
     elder = true;
     broker.getLinks().setPassive(false);
     timer->becomeElder();
-
-    clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
 }
 
 void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() {
 // point we know the poller has stopped so no poller callbacks will be
 // invoked. We must ensure that CPG has also shut down so no CPG
 // callbacks will be invoked.
-//
+// 
 void Cluster::brokerShutdown()  {
     sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     try { cpg.shutdown(); }
@@ -799,7 +775,7 @@ void Cluster::updateRequest(const Member
 }
 
 void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
-                            const framing::Uuid& id,
+                            const framing::Uuid& id, 
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
                             const std::string& firstConfig,
@@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId
     else if (updatee == self && url) {
         assert(state == JOINER);
         state = UPDATEE;
-        acl = broker.getAcl();
-        broker.setAcl(0);       // Disable ACL during update
         QPID_LOG(notice, *this << " receiving update from " << updater);
         checkUpdateIn(l);
     }
@@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId
     if (updatee != self && url) {
         QPID_LOG(debug, debugSnapshot());
         if (mAgent) mAgent->clusterUpdate();
-        // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
+        // Updatee will call clusterUpdate when update completes
     }
 }
 
@@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) {
     if (!updateClosed) return;  // Wait till update connection closes.
     if (updatedMap) { // We're up to date
         map = *updatedMap;
+        failoverExchange->setUrls(getUrls(l));
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         memberUpdate(l);
-        // Must be called *after* memberUpdate() to avoid sending an extra update.
-        failoverExchange->setReady();
         // NB: don't updateMgmtMembership() here as we are not in the deliver
         // thread. It will be updated on delivery of the "ready" we just mcast.
         broker.setClusterUpdatee(false);
-        broker.setAcl(acl);     // Restore ACL
         discarding = false;     // OK to set, we're stalled for update.
         QPID_LOG(notice, *this << " update complete, starting catch-up.");
         QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
@@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) {
             mAgent->suppress(false); // Enable management output.
             mAgent->clusterUpdate();
         }
-        // Restore alternate exchange settings on exchanges.
-        broker.getExchanges().eachExchange(
-            boost::bind(&broker::Exchange::recoveryComplete, _1,
-                        boost::ref(broker.getExchanges())));
         enableClusterSafe();    // Enable cluster-safe assertions
         deliverEventQueue.start();
     }
@@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) {
 
 void Cluster::updateOutError(const std::exception& e)  {
     Monitor::ScopedLock l(lock);
-    QPID_LOG(error, *this << " error sending update: " << e.what());
+    QPID_LOG(error, *this << " error sending update: " << e.what());    
     updateOutDone(l);
 }
 
@@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) {
 void Cluster::updateMgmtMembership(Lock& l) {
     if (!mgmtObject) return;
     std::vector<Url> urls = getUrls(l);
-    mgmtObject->set_clusterSize(urls.size());
+    mgmtObject->set_clusterSize(urls.size()); 
     string urlstr;
     for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
         if (i != urls.begin()) urlstr += ";";
@@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& u
     QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
 }
 
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+    expiryPolicy->deliverExpire(id);
+}
+
 void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
     // If we see an errorCheck here (rather than in the ErrorCheck
     // class) then we have processed succesfully past the point of the
@@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::
     q->deliver(msg);
 }
 
-sys::AbsTime Cluster::getClusterTime() {
-    Mutex::ScopedLock l(lock);
-    return clusterTime;
-}
-
-// This method is called during update on the updatee to set the initial cluster time.
-void Cluster::clock(const uint64_t time) {
-    Mutex::ScopedLock l(lock);
-    clock(time, l);
-}
-
-// called when broadcast message received
-void Cluster::clock(const uint64_t time, Lock&) {
-    clusterTime = AbsTime(EPOCH, time);
-    AbsTime now = AbsTime::now();
-
-    if (!elder) {
-      clusterTimeOffset = Duration(now, clusterTime);
-    }
-}
-
-// called by elder timer to send clock broadcast
-void Cluster::sendClockUpdate() {
-    Mutex::ScopedLock l(lock);
-    int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
-    nanosecondsSinceEpoch += clusterTimeOffset;
-    mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
-}
-
 bool Cluster::deferDeliveryImpl(const std::string& queue,
                                 const boost::intrusive_ptr<broker::Message>& msg)
 {
@@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const st
     return true;
 }
 
-bool Cluster::loggable(const AMQFrame& f) {
-    const  AMQMethodBody* method = (f.getMethod());
-    if (!method) return true;     // Not a method
-    bool isClock = method->amqpClassId() ==  ClusterClockBody::CLASS_ID
-        && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
-    return !isClock;
-}
-
 }} // namespace qpid::cluster



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org