You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/06/08 17:31:32 UTC

svn commit: r952692 - in /qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Tue Jun  8 15:31:31 2010
New Revision: 952692

URL: http://svn.apache.org/viewvc?rev=952692&view=rev
Log:
Cluster handle connection-negotiation phase in local broker.

The connection negotiation phase up to the "open" or "open-ok" frame
establishes whether/what encryption to use for the rest of the
connection.

With this patch a cluster broker completes the initial negotiation
with its local clients and only then begins multicasting to other
brokers. The local broker decrypts if necessary and multicasts in the
clear.

This replaces a problematic locking scheme that was formerly in place
which caused deadlocks.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun  8 15:31:31 2010
@@ -386,7 +386,6 @@ void Connection::restartTimeout()
         timeoutTimer->touch();
 }
 
-
-
+bool Connection::isOpen() { return adapter.isOpen(); }
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun  8 15:31:31 2010
@@ -63,9 +63,6 @@ class LinkRegistry;
 class SecureConnection;
 struct ConnectionTimeoutTask;
 
-typedef boost::function<void ( std::string& )> userIdCallback;
-
-
 class Connection : public sys::ConnectionInputHandler,
                    public ConnectionState,
                    public RefCounted
@@ -146,9 +143,8 @@ class Connection : public sys::Connectio
         return securitySettings;
     }
 
-    void setUserIdCallback ( UserIdCallback fn ) {
-        adapter.setUserIdCallback ( fn );
-    }
+    /** @return true if the initial connection negotiation is complete. */
+    bool isOpen();
 
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Jun  8 15:31:31 2010
@@ -87,7 +87,8 @@ ConnectionHandler::ConnectionHandler(Con
 
 ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
     proxy(c.getOutput()),
-    connection(c), serverMode(!isClient), acl(0), secured(0), userIdCallback(0)
+    connection(c), serverMode(!isClient), acl(0), secured(0),
+    isOpen(false)
 {
     if (serverMode) {
 
@@ -195,14 +196,7 @@ void ConnectionHandler::Handler::open(co
         if (sl.get()) secured->activateSecurityLayer(sl);
     }
 
-    if ( userIdCallback ) {
-        string s;
-        // Not checking the return value of getUsername, if there is
-        // no username then we want to call the userIdCallback anyway
-        // with an empty string.
-        authenticator->getUsername(s);
-        userIdCallback(s);
-    }
+    isOpen = true;
     proxy.openOk(array);
 }
 
@@ -272,6 +266,7 @@ void ConnectionHandler::Handler::openOk(
         Url url((*i)->get<std::string>());
         connection.getKnownHosts().push_back(url);
     }
+    isOpen = true;
 }
 
 void ConnectionHandler::Handler::redirect(const string& /*host*/, const framing::Array& /*knownHosts*/)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Jun  8 15:31:31 2010
@@ -40,9 +40,6 @@ namespace broker {
 class Connection;
 class SecureConnection;
 
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-
 class ConnectionHandler : public framing::FrameHandler
 {
     struct Handler : public framing::AMQP_AllOperations::ConnectionHandler
@@ -53,6 +50,7 @@ class ConnectionHandler : public framing
         std::auto_ptr<SaslAuthenticator> authenticator;
         AclModule* acl;
         SecureConnection* secured;
+        bool isOpen;
 
         Handler(Connection& connection, bool isClient, bool isShadow=false);
         ~Handler();
@@ -67,10 +65,6 @@ class ConnectionHandler : public framing
         void close(uint16_t replyCode, const std::string& replyText);
         void closeOk();
 
-        UserIdCallback userIdCallback;
-        void setUserIdCallback ( UserIdCallback fn ) { userIdCallback = fn; };
-
-
         void start(const qpid::framing::FieldTable& serverProperties,
                    const framing::Array& mechanisms,
                    const framing::Array& locales);
@@ -95,9 +89,7 @@ class ConnectionHandler : public framing
     void heartbeat();
     void handle(framing::AMQFrame& frame);
     void setSecureConnection(SecureConnection* secured);
-    void setUserIdCallback ( UserIdCallback fn ) {
-      handler->setUserIdCallback ( fn );
-    }
+    bool isOpen() { return handler->isOpen; }
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.h Tue Jun  8 15:31:31 2010
@@ -36,12 +36,6 @@ namespace broker {
 
 class Connection;
 
-// Calls your fn with the user ID string, just 
-// after the security negotiation is complete.
-// Add your callback to the list with addUserIdCallback().
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-
 class SaslAuthenticator
 {
 public:
@@ -54,7 +48,6 @@ public:
     virtual void getError(std::string&) {}
     virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;
 
-    virtual void setUserIdCallback ( UserIdCallback ) { }
     static bool available(void);
 
     // Initialize the SASL mechanism; throw if it fails.
@@ -64,9 +57,6 @@ public:
     static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow);
 
     virtual void callUserIdCallbacks() { }
-
-private:
-    UserIdCallback userIdCallback;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun  8 15:31:31 2010
@@ -39,7 +39,6 @@
 #include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
@@ -48,15 +47,6 @@
 #include <boost/current_function.hpp>
 
 
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-// TODO aconway 2008-11-03:
-// 
-// Refactor code for receiving an update into a separate UpdateConnection
-// class.
-//
-
-
 namespace qpid {
 namespace cluster {
 
@@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::
       expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
       updateIn(c.getUpdateReceiver()),
-      secureConnection(0),
-      mcastSentButNotReceived(false),
-      inConnectionNegotiation(true)
-{ }
+      secureConnection(0)
+{}
 
 // Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::
     expectProtocolHeader(isLink),
     mcastFrameHandler(cluster.getMulticast(), self),
     updateIn(c.getUpdateReceiver()),
-    secureConnection(0),
-    mcastSentButNotReceived(false),
-    inConnectionNegotiation(true)
+    secureConnection(0)
 {
     cluster.addLocalConnection(this);
     if (isLocalClient()) {
@@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::
         // and initialized when the announce is received.
         QPID_LOG(info, "new client connection " << *this);
         giveReadCredit(cluster.getSettings().readMax); // Flow control
-        cluster.getMulticast().mcastControl(
-            ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
-                                          connectionCtor.external.ssf,
-                                          connectionCtor.external.authid,
-                                          connectionCtor.external.nodict), getId());
+        init();
     }
     else {
         // Catch-up shadow connections initialized using nextShadow id.
@@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::
 }
 
 void Connection::setSecureConnection(broker::SecureConnection* sc) {
-  secureConnection = sc;
+    secureConnection = sc;
+    if (connection.get()) connection->setSecureConnection(sc);
 }
 
 void Connection::init() {
@@ -155,30 +138,33 @@ void Connection::init() {
     }
     if (!isCatchUp())
         connection->setErrorListener(this);
-    UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
-    connection->setUserIdCallback ( fn );
 }
 
 // Called when we have consumed a read buffer to give credit to the
 // connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
-    {
-        sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-        if (inConnectionNegotiation) {
-            mcastSentButNotReceived = false;
-            connectionNegotiationMonitor.notify();
-        }
-    }
     if (cluster.getSettings().readMax && credit) 
         output.giveReadCredit(credit);
 }
 
-void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
+void Connection::announce(
+    const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
+    const std::string& username, const std::string& initialFrames)
+{
     QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
     QPID_ASSERT(ssf == connectionCtor.external.ssf);
     QPID_ASSERT(authid == connectionCtor.external.authid);
     QPID_ASSERT(nodict == connectionCtor.external.nodict);
-    init();
+    // Local connections are already initialized.
+    if (isShadow()) {
+        init();
+        // Play initial frames into the connection.
+        Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
+        AMQFrame frame;
+        while (frame.decode(buf))
+            connection->received(frame);
+         connection->setUserId(username);
+    }
 }
 
 Connection::~Connection() {
@@ -201,7 +187,6 @@ void Connection::received(framing::AMQFr
     if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
-
             connection->received(f);
     }
     else {             // Shadow or updated catch-up connection.
@@ -235,7 +220,7 @@ struct GiveReadCreditOnExit {
     int credit;
     GiveReadCreditOnExit(Connection& connection_, int credit_) :
         connection(connection_), credit(credit_) {}
-    ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); }
+    ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
 };
 
 void Connection::deliverDoOutput(uint32_t limit) {
@@ -307,57 +292,76 @@ void Connection::abort() {
 }
 
 // ConnectionCodec::decode receives read buffers from  directly-connected clients.
-size_t Connection::decode(const char* buffer, size_t size) {
-
-    if (catchUp) {  // Handle catch-up locally.
-        Buffer buf(const_cast<char*>(buffer), size);
+size_t Connection::decode(const char* data, size_t size) {
+    GiveReadCreditOnExit grc(*this, 1);   // Give a read credit by default.
+    const char* ptr = data;
+    const char* end = data + size;
+    if (catchUp) {              // Handle catch-up locally.
+        Buffer buf(const_cast<char*>(ptr), size);
+        ptr += size;
         while (localDecoder.decode(buf))
             received(localDecoder.getFrame());
-        return buf.getPosition();
     }
     else {                      // Multicast local connections.
-        assert(isLocal());
-        const char* remainingData = buffer;
-        size_t remainingSize = size;
-
-        if (expectProtocolHeader) {
-            //If this is an outgoing link, we will receive a protocol
-            //header which needs to be decoded first
-            framing::ProtocolInitiation pi;
-            Buffer buf(const_cast<char*>(buffer), size);
-            if (pi.decode(buf)) {
-                //TODO: check the version is correct
-                QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
-                expectProtocolHeader = false;
-                remainingData = buffer + pi.encodedSize();
-                remainingSize = size - pi.encodedSize();
-            } else {
-                QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
-                giveReadCredit(1); // We're not going to mcast so give read credit now.
-                return 0;
-            }
-        }
-
-        // During connection negotiation wait for each multicast to be
-        // processed before sending the next, to ensure that the
-        // security layer is activated before we attempt to decode
-        // encrypted frames.
-        { 
-            sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-            if ( inConnectionNegotiation ) {
-                assert(!mcastSentButNotReceived);
-                mcastSentButNotReceived = true;
-            }
-        }
-        cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
-        {
-            sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-            if (inConnectionNegotiation)
-                while (mcastSentButNotReceived)
-                    connectionNegotiationMonitor.wait();
-            assert(!mcastSentButNotReceived);
-        }
-        return size;
+        assert(isLocalClient());
+        assert(connection.get());
+        if (!checkProtocolHeader(ptr, size)) // Updates ptr
+            return 0; // Incomplete header
+
+        if (!connection->isOpen()) 
+            processInitialFrames(ptr, end-ptr); // Updates ptr
+        
+        if (connection->isOpen() && end - ptr > 0) {
+            // We're multi-casting, we will give read credit on delivery.
+            grc.credit = 0;
+            cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
+            ptr = end;
+        }
+    }
+    return ptr - data;
+}
+
+// Decode the protocol header if needed. Updates data and size
+// returns true if the header is complete or already read.
+bool Connection::checkProtocolHeader(const char*& data, size_t size) {
+    if (expectProtocolHeader) {
+        //If this is an outgoing link, we will receive a protocol
+        //header which needs to be decoded first
+        framing::ProtocolInitiation pi;
+        Buffer buf(const_cast<char*&>(data), size);
+        if (pi.decode(buf)) {
+            //TODO: check the version is correct
+            QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+            expectProtocolHeader = false;
+            data += pi.encodedSize();
+        } else {
+            return false;
+        }
+    }
+    return true;
+}
+
+void Connection::processInitialFrames(const char*& ptr, size_t size) {
+    // Process the initial negotiation locally and store it so
+    // it can be replayed on other brokers in announce()
+    Buffer buf(const_cast<char*>(ptr), size);
+    framing::AMQFrame frame;
+    while (!connection->isOpen() && frame.decode(buf))
+        received(frame);
+    initialFrames.append(ptr, buf.getPosition());
+    ptr += buf.getPosition();
+    if (connection->isOpen()) { // initial negotiation complete
+        cluster.getMulticast().mcastControl(
+            ClusterConnectionAnnounceBody(
+                ProtocolVersion(),
+                connectionCtor.mgmtId,
+                connectionCtor.external.ssf,
+                connectionCtor.external.authid,
+                connectionCtor.external.nodict,
+                connection->getUserId(),
+                initialFrames),
+            getId());
+        initialFrames.clear();
     }
 }
 
@@ -574,21 +578,14 @@ void Connection::queue(const std::string
 }
 
 void Connection::sessionError(uint16_t , const std::string& msg) {
-    // If we are negotiating the connection when it fails just close the connectoin.
-    // If it fails after that then we have to flag the error to the cluster.
-    if (inConnectionNegotiation)
-        cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
-    else
+    // Ignore errors before isOpen(), we're not multicasting yet.
+    if (connection->isOpen())
         cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
-    
 }
 
 void Connection::connectionError(const std::string& msg) {
-    // If we are negotiating the connection when it fails just close the connectoin.
-    // If it fails after that then we have to flag the error to the cluster.
-    if (inConnectionNegotiation)
-        cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
-    else
+    // Ignore errors before isOpen(), we're not multicasting yet.
+    if (connection->isOpen())
         cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
 }
 
@@ -630,30 +627,5 @@ void Connection::managementAgents(const 
     QPID_LOG(debug, cluster << " updated management agents");
 }
 
-
-void Connection::mcastUserId ( std::string & id ) {
-    // Only the directly connected broker will mcast the secure user id, and only
-    // for client connections (not update connections)
-    if (isLocalClient())
-        cluster.getMulticast().mcastControl(
-            ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
-    {
-        // This call signals the end of the connection negotiation phase.
-        sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
-        inConnectionNegotiation = false;
-        mcastSentButNotReceived = false;
-        connectionNegotiationMonitor.notify();
-    }
-}
-
-// All connections, shadow or not, get this call.
-void Connection::secureUserId(const std::string& id) {
-    // Only set the user ID on shadow connections, and only if id is not the empty string.
-    if ( isShadow() && !id.empty() )
-        connection->setUserId ( id );
-}
-
-
-
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun  8 15:31:31 2010
@@ -164,8 +164,9 @@ class Connection :
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict);
-    void secureUserId(const std::string&);
+    void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid,
+                  bool nodict, const std::string& username,
+                  const std::string& initFrames);
     void abort();
     void deliverClose();
 
@@ -175,16 +176,8 @@ class Connection :
     void managementSchema(const std::string& data);
     void managementAgents(const std::string& data);
     void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
-
-    //uint32_t getSsf() const { return connectionCtor.external.ssf; }
-
     void setSecureConnection ( broker::SecureConnection * sc );
 
-    // This is a callback, registered with the broker connection.
-    // It gives me the user ID, if one is negotiated through Sasl.
-    void mcastUserId ( std::string & );
-
-
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -228,6 +221,8 @@ class Connection :
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverDoOutput(uint32_t limit);
 
+    bool checkProtocolHeader(const char*& data, size_t size);
+    void processInitialFrames(const char*& data, size_t size);
     boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
     broker::SessionState& sessionState();
     broker::SemanticState& semanticState();
@@ -247,13 +242,10 @@ class Connection :
     McastFrameHandler mcastFrameHandler;
     UpdateReceiver& updateIn;
     qpid::broker::SecureConnection* secureConnection;
+    std::string initialFrames;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
 
-    mutable sys::Monitor connectionNegotiationMonitor;
-    bool mcastSentButNotReceived;
-    bool inConnectionNegotiation;
-    
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jun  8 15:31:31 2010
@@ -61,7 +61,6 @@ void Multicaster::mcast(const Event& e) 
     QPID_LOG(trace, "MCAST " << e);
     if (bypass) {               // direct, don't queue
         iovec iov = e.toIovec();
-        // FIXME aconway 2010-03-10: should do limited retry.
         while (!cpg.mcast(&iov, 1))
             ;
     }

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jun  8 15:31:31 2010
@@ -510,7 +510,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBui
     Client c1(cluster[1], "c1");
     BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
     BOOST_CHECK_EQUAL(m.getData(), "abcd");
-    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());
 }
 
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
@@ -518,13 +518,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownH
     prepareArgs(args, durableFlag);
     ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0], "c0");
-    set<int> kb0 = knownBrokerPorts(c0.connection);
+    set<int> kb0 = knownBrokerPorts(c0.connection, 1);
     BOOST_CHECK_EQUAL(kb0.size(), 1u);
     BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
 
     cluster.add();
     Client c1(cluster[1], "c1");
-    set<int> kb1 = knownBrokerPorts(c1.connection);
+    set<int> kb1 = knownBrokerPorts(c1.connection, 2);
     kb0 = knownBrokerPorts(c0.connection, 2);
     BOOST_CHECK_EQUAL(kb1.size(), 2u);
     BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -532,7 +532,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownH
 
     cluster.add();
     Client c2(cluster[2], "c2");
-    set<int> kb2 = knownBrokerPorts(c2.connection);
+    set<int> kb2 = knownBrokerPorts(c2.connection, 3);
     kb1 = knownBrokerPorts(c1.connection, 3);
     kb0 = knownBrokerPorts(c0.connection, 3);
     BOOST_CHECK_EQUAL(kb2.size(), 3u);

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=952692&r1=952691&r2=952692&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun  8 15:31:31 2010
@@ -127,6 +127,10 @@
       <field name="authid" type="str16"/>
       <!-- exclude certain sasl mechs, used with ssl and sasl-external -->
       <field name="nodict" type="bit"/>
+      <!-- User name as negotiated by SASL -->
+      <field name="username" type="str32"/>
+      <!-- Frames forming the initial connection negotiation.  -->
+      <field name="initial-frames" type="str32"/>
     </control>
 
     <!-- Marks the cluster-wide point when a connection is considered closed. -->
@@ -263,11 +267,5 @@
     <control name="management-agents" code="0x37">
       <field name="data" type="vbin32"/>
     </control>
-
-    <!-- Announce the user ID on a secure connection -->
-    <control name="secureUserId" code="0x38">
-      <field name="secure-user-id" type="str16"/>
-    </control>
-
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org