You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/05/21 23:40:50 UTC

svn commit: r658886 - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/sys/ cpp/src/qpid/sys/posix/ python/commands/

Author: tross
Date: Wed May 21 14:40:49 2008
New Revision: 658886

URL: http://svn.apache.org/viewvc?rev=658886&view=rev
Log:
QPID-1087

Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolAccess.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/python/commands/qpid-route

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed May 21 14:40:49 2008
@@ -536,7 +536,6 @@
   qpid/sys/OutputControl.h \
   qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
-  qpid/sys/ProtocolAccess.h \
   qpid/sys/ProtocolFactory.h \
   qpid/sys/Runnable.h \
   qpid/sys/ScopedIncrement.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Wed May 21 14:40:49 2008
@@ -19,7 +19,6 @@
  *
  */
 #include "Connection.h"
-#include "qpid/sys/ProtocolAccess.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/exceptions.h"
 
@@ -28,13 +27,9 @@
 
 using sys::Mutex;
 
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a)
-    : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)),
-          identifier(id), initialized(false), isClient(_isClient)
-{
-    if (a != 0)
-        a->callConnCb(connection);
-}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
+    : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+      identifier(id), initialized(false), isClient(_isClient) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
     framing::Buffer in(const_cast<char*>(buffer), size);
@@ -50,13 +45,13 @@
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        connection->received(frame);
+        connection.received(frame);
     }
     return in.getPosition();
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection->doOutput();
+    if (!frameQueueClosed) connection.doOutput();
     Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
@@ -95,7 +90,7 @@
 }
 
 void  Connection::closed() {
-    connection->closed();
+    connection.closed();
 }
 
 void Connection::send(framing::AMQFrame& f) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Wed May 21 14:40:49 2008
@@ -29,7 +29,6 @@
 #include <queue>
 
 namespace qpid {
-namespace sys { class ProtocolAccess; }
 namespace broker { class Broker; }
 namespace amqp_0_10 {
 
@@ -41,13 +40,13 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18: 
+    broker::Connection connection; // FIXME aconway 2008-03-18: 
     std::string identifier;
     bool initialized;
     bool isClient;
     
   public:
-    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0);
+    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool isClosed() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed May 21 14:40:49 2008
@@ -361,18 +361,20 @@
 // TODO: How to chose the protocolFactory to use for the connection
 void Broker::connect(
     const std::string& host, uint16_t port, bool /*useSsl*/,
-    sys::ConnectionCodec::Factory* f,
-    sys::ProtocolAccess* access)
+    boost::function2<void, int, std::string> failed,
+    sys::ConnectionCodec::Factory* f)
 {
-    getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
+    getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
 }
 
 void Broker::connect(
-    const Url& url, sys::ConnectionCodec::Factory* f)
+    const Url& url,
+    boost::function2<void, int, std::string> failed,
+    sys::ConnectionCodec::Factory* f)
 {
     url.throwIfEmpty();
     TcpAddress addr=boost::get<TcpAddress>(url[0]);
-    connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
+    connect(addr.host, addr.port, false, failed, f);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed May 21 14:40:49 2008
@@ -44,7 +44,6 @@
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
-#include "qpid/sys/ProtocolAccess.h"
 
 #include <vector>
 
@@ -135,10 +134,12 @@
 
     /** Create a connection to another broker. */
     void connect(const std::string& host, uint16_t port, bool useSsl,
-                 sys::ConnectionCodec::Factory* =0,
-                 sys::ProtocolAccess* =0);
+                 boost::function2<void, int, std::string> failed,
+                 sys::ConnectionCodec::Factory* =0);
     /** Create a connection to another broker. */
-    void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+    void connect(const Url& url,
+                 boost::function2<void, int, std::string> failed,
+                 sys::ConnectionCodec::Factory* =0);
 
     // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
     // For the present just return the first ProtocolFactory registered.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed May 21 14:40:49 2008
@@ -47,44 +47,26 @@
 namespace qpid {
 namespace broker {
 
-class Connection::MgmtClient : public Connection::MgmtWrapper
-{
-    management::Client::shared_ptr mgmtClient;
-
-public:
-    MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
-               const std::string& mgmtId, bool incoming);
-    ~MgmtClient();
-    void received(framing::AMQFrame& frame);
-    management::ManagementObject::shared_ptr getManagementObject() const;
-    void closing();
-};
-
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
     ConnectionState(out_, broker_),
-    adapter(*this, isLink),
+    adapter(*this, isLink_),
+    isLink(isLink_),
     mgmtClosing(false),
-    mgmtId(mgmtId_)
-{
-    initMgmt();
-}
-
-void Connection::initMgmt(bool asLink)
+    mgmtId(mgmtId_),
+    links(broker_.getLinks())
 {
     Manageable* parent = broker.GetVhostObject ();
 
+    if (isLink)
+        links.notifyConnection (mgmtId, this);
+
     if (parent != 0)
     {
         ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
 
         if (agent.get () != 0)
-        {
-            if (asLink) {
-                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
-            } else {
-                mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
-            }
-        }
+            mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
+        agent->addObject (mgmtObject);
     }
 }
 
@@ -95,19 +77,65 @@
 }
 
 
-Connection::~Connection () {}
+Connection::~Connection ()
+{
+    if (mgmtObject.get() != 0)
+        mgmtObject->resourceDestroy();
+    if (isLink)
+        links.notifyClosed (mgmtId);
+}
 
 void Connection::received(framing::AMQFrame& frame){
-    if (mgmtClosing)
-        close (403, "Closed by Management Request", 0, 0);
-
     if (frame.getChannel() == 0 && frame.getMethod()) {
         adapter.handle(frame);
     } else {
         getChannel(frame.getChannel()).in(frame);
     }
-    
-    if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+
+    if (isLink)
+        recordFromServer(frame);
+    else
+        recordFromClient(frame);
+}
+
+void Connection::recordFromServer (framing::AMQFrame& frame)
+{
+    if (mgmtObject.get () != 0)
+    {
+        mgmtObject->inc_framesToClient ();
+        mgmtObject->inc_bytesToClient (frame.size ());
+    }
+}
+
+void Connection::recordFromClient (framing::AMQFrame& frame)
+{
+    if (mgmtObject.get () != 0)
+    {
+        mgmtObject->inc_framesFromClient ();
+        mgmtObject->inc_bytesFromClient (frame.size ());
+    }
+}
+
+string Connection::getAuthMechanism()
+{
+    if (!isLink)
+        return string("ANONYMOUS");
+
+    return links.getAuthMechanism(mgmtId);
+}
+
+string Connection::getAuthCredentials()
+{
+    if (!isLink)
+        return string();
+
+    return links.getAuthCredentials(mgmtId);
+}
+
+void Connection::notifyConnectionForced(const string& text)
+{
+    if (isLink)
+        links.notifyConnectionForced(mgmtId, text);
 }
 
 void Connection::close(
@@ -125,7 +153,7 @@
 void Connection::closed(){ // Physically closed, suspend open sessions.
     try {
         while (!channels.empty()) 
-	    ptr_map_ptr(channels.begin())->handleDetach();
+            ptr_map_ptr(channels.begin())->handleDetach();
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
             q->releaseExclusiveOwnership();
@@ -147,10 +175,12 @@
         if (ioCallback)
             ioCallback(); // Lend the IO thread for management processing
         ioCallback = 0;
-        if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
 
-        //then do other output as needed:
-        return outputTasks.doOutput();
+        if (mgmtClosing)
+            close (403, "Closed by Management Request", 0, 0);
+        else
+            //then do other output as needed:
+            return outputTasks.doOutput();
     }catch(ConnectionException& e){
         close(e.code, e.what(), 0, 0);
     }catch(std::exception& e){
@@ -174,7 +204,7 @@
 
 ManagementObject::shared_ptr Connection::GetManagementObject (void) const
 {
-    return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+    return dynamic_pointer_cast<ManagementObject>(mgmtObject);
 }
 
 Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
@@ -187,7 +217,7 @@
     {
     case management::Client::METHOD_CLOSE :
         mgmtClosing = true;
-        if (mgmtWrapper.get()) mgmtWrapper->closing();
+        if (mgmtObject.get()) mgmtObject->set_closing(1);
         out->activateOutput();
         status = Manageable::STATUS_OK;
         break;
@@ -196,39 +226,5 @@
     return status;
 }
 
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
-                                   ManagementAgent::shared_ptr agent,
-                                   const std::string& mgmtId, bool incoming)
-{
-    mgmtClient = management::Client::shared_ptr
-        (new management::Client (conn, parent, mgmtId, incoming));
-    agent->addObject (mgmtClient);
-}
-
-Connection::MgmtClient::~MgmtClient()
-{
-    if (mgmtClient.get () != 0)
-        mgmtClient->resourceDestroy ();
-}
-
-void Connection::MgmtClient::received(framing::AMQFrame& frame)
-{
-    if (mgmtClient.get () != 0)
-    {
-        mgmtClient->inc_framesFromClient ();
-        mgmtClient->inc_bytesFromClient (frame.size ());
-    }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
-{
-    return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void Connection::MgmtClient::closing()
-{
-    if (mgmtClient) mgmtClient->set_closing (1);
-}
-
 }}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed May 21 14:40:49 2008
@@ -43,13 +43,14 @@
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Client.h"
-#include "qpid/management/Link.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 
 namespace qpid {
 namespace broker {
 
+class LinkRegistry;
+
 class Connection : public sys::ConnectionInputHandler, 
                    public ConnectionState
 {
@@ -62,7 +63,10 @@
     SessionHandler& getChannel(framing::ChannelId channel);
 
     /** Close the connection */
-    void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+    void close(framing::ReplyCode code = 403,
+               const string& text = string(),
+               framing::ClassId classId = 0,
+               framing::MethodId methodId = 0);
 
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
@@ -78,38 +82,26 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
 
-    void initMgmt(bool asLink = false);
     void requestIOProcessing (boost::function0<void>);
+    void recordFromServer (framing::AMQFrame& frame);
+    void recordFromClient (framing::AMQFrame& frame);
+    std::string getAuthMechanism();
+    std::string getAuthCredentials();
+    void notifyConnectionForced(const std::string& text);
 
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
-    /**
-     * Connection may appear, for the purposes of management, as a
-     * normal client initiated connection or as an agent initiated
-     * inter-broker link. This wrapper abstracts the common interface
-     * for both.
-     */
-    class MgmtWrapper
-    {
-    public:
-        virtual ~MgmtWrapper(){}
-        virtual void received(framing::AMQFrame& frame) = 0;
-        virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
-        virtual void closing() = 0;
-        virtual void processPending(){}
-        virtual void process(Connection&, const management::Args&){}
-    };
-    class MgmtClient;
-
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;
     ConnectionHandler adapter;
-    std::auto_ptr<MgmtWrapper> mgmtWrapper;
+    bool isLink;
     bool mgmtClosing;
     const std::string mgmtId;
     boost::function0<void> ioCallback;
+    management::Client::shared_ptr mgmtObject;
+    LinkRegistry& links;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Wed May 21 14:40:49 2008
@@ -39,9 +39,9 @@
 }
 
 sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
     // used to create connections from one broker to another
-    return new amqp_0_10::Connection(out, broker, id, true, a);
+    return new amqp_0_10::Connection(out, broker, id, true);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Wed May 21 14:40:49 2008
@@ -24,7 +24,6 @@
 #include "qpid/sys/ConnectionCodec.h"
 
 namespace qpid {
-namespace sys { class ProtocolAccess; }
 namespace broker {
 class Broker;
 
@@ -38,7 +37,7 @@
     create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
 
     sys::ConnectionCodec*
-    create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
+    create(sys::OutputControl&, const std::string& id);
 
   private:
     Broker& broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Wed May 21 14:40:49 2008
@@ -26,6 +26,7 @@
 #include "Connection.h"
 #include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/ServerInvoker.h"
+#include "qpid/framing/constants.h"
 #include "qpid/log/Statement.h"
 
 using namespace qpid;
@@ -123,6 +124,10 @@
     if (replyCode != 200) {
         QPID_LOG(warning, "Client closed connection with " << replyCode << ": " << replyText);
     }
+
+    if (replyCode == framing::connection::CONNECTION_FORCED)
+        connection.notifyConnectionForced(replyText);
+
     client.closeOk();
     connection.getOutput().close();
 } 
@@ -136,9 +141,10 @@
                                        const framing::Array& /*mechanisms*/,
                                        const framing::Array& /*locales*/)
 {
-    string response;
-    server.startOk(FieldTable(), ANONYMOUS, response, en_US);
-    connection.initMgmt(true);
+    string mechanism = connection.getAuthMechanism();
+    string response  = connection.getAuthCredentials();
+    
+    server.startOk(FieldTable(), mechanism, response, en_US);
 }
 
 void ConnectionHandler::Handler::secure(const string& /*challenge*/)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed May 21 14:40:49 2008
@@ -51,13 +51,11 @@
     : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), broker(_broker), state(0),
-      access(boost::bind(&Link::established, this),
-             boost::bind(&Link::closed, this, _1, _2),
-             boost::bind(&Link::setConnection, this, _1)),
       visitCount(0),
       currentInterval(1),
       closing(false),
-      channelCounter(1)
+      channelCounter(1),
+      connection(0)
 {
     if (parent != 0)
     {
@@ -75,8 +73,9 @@
 
 Link::~Link ()
 {
-    if (state == STATE_OPERATIONAL)
-        access.close();
+    if (state == STATE_OPERATIONAL && connection != 0)
+        connection->close();
+
     if (mgmtObject.get () != 0)
         mgmtObject->resourceDestroy ();
 }
@@ -95,13 +94,16 @@
     case STATE_WAITING     : mgmtObject->set_state("Waiting");     break;
     case STATE_CONNECTING  : mgmtObject->set_state("Connecting");  break;
     case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+    case STATE_FAILED      : mgmtObject->set_state("Failed");      break;
+    case STATE_CLOSED      : mgmtObject->set_state("Closed");      break;
     }
 }
 
 void Link::startConnectionLH ()
 {
     try {
-        broker->connect (host, port, useSsl, 0, &access);
+        broker->connect (host, port, useSsl,
+                         boost::bind (&Link::closed, this, _1, _2));
         setStateLH(STATE_CONNECTING);
     } catch(std::exception& e) {
         setStateLH(STATE_WAITING);
@@ -125,16 +127,21 @@
 {
     Mutex::ScopedLock mutex(lock);
 
+    connection = 0;
+
     if (state == STATE_OPERATIONAL)
         QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
 
-    connection.reset();
     for (Bridges::iterator i = active.begin(); i != active.end(); i++)
         created.push_back(*i);
     active.clear();
 
-    setStateLH(STATE_WAITING);
-    mgmtObject->set_lastError (text);
+    if (state != STATE_FAILED)
+    {
+        setStateLH(STATE_WAITING);
+        mgmtObject->set_lastError (text);
+    }
+
     if (closing)
         destroy();
 }
@@ -145,7 +152,10 @@
     Bridges toDelete;
 
     QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
-    connection.reset();
+    if (connection)
+        connection->close(403, "closed by management");
+
+    setStateLH(STATE_CLOSED);
 
     // Move the bridges to be deleted into a local vector so there is no
     // corruption of the iterator caused by bridge deletion.
@@ -168,10 +178,7 @@
 void Link::add(Bridge::shared_ptr bridge)
 {
     Mutex::ScopedLock mutex(lock);
-
     created.push_back (bridge);
-    if (state == STATE_OPERATIONAL && connection.get() != 0)
-        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::cancel(Bridge::shared_ptr bridge)
@@ -197,6 +204,9 @@
 {
     Mutex::ScopedLock mutex(lock);
 
+    if (state != STATE_OPERATIONAL)
+        return;
+
     //process any pending creates
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -207,12 +217,10 @@
     }
 }
 
-void Link::setConnection(Connection::shared_ptr c)
+void Link::setConnection(Connection* c)
 {
     Mutex::ScopedLock mutex(lock);
-
     connection = c;
-    connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::maintenanceVisit ()
@@ -231,6 +239,8 @@
             startConnectionLH();
         }
     }
+    else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+        connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 
 uint Link::nextChannel()
@@ -240,6 +250,14 @@
     return channelCounter++;
 }
 
+void Link::notifyConnectionForced(const string text)
+{
+    Mutex::ScopedLock mutex(lock);
+
+    setStateLH(STATE_FAILED);
+    mgmtObject->set_lastError(text);
+}
+
 void Link::setPersistenceId(uint64_t id) const
 {
     if (mgmtObject != 0 && persistenceId == 0)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Wed May 21 14:40:49 2008
@@ -27,7 +27,6 @@
 #include "PersistableConfig.h"
 #include "Bridge.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/ProtocolAccess.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Link.h"
@@ -57,7 +56,6 @@
             management::Link::shared_ptr mgmtObject;
             Broker* broker;
             int     state;
-            sys::ProtocolAccess access;
             uint32_t visitCount;
             uint32_t currentInterval;
             bool     closing;
@@ -66,21 +64,20 @@
             Bridges created;   // Bridges pending creation
             Bridges active;    // Bridges active
             uint channelCounter;
-            boost::shared_ptr<Connection> connection;
+            Connection* connection;
 
             static const int STATE_WAITING     = 1;
             static const int STATE_CONNECTING  = 2;
             static const int STATE_OPERATIONAL = 3;
+            static const int STATE_FAILED      = 4;
+            static const int STATE_CLOSED      = 5;
 
-            static const uint32_t MAX_INTERVAL = 16;
+            static const uint32_t MAX_INTERVAL = 32;
 
             void setStateLH (int newState);
             void startConnectionLH();        // Start the IO Connection
-            void established();              // Called when connection is created
-            void closed(int, std::string);   // Called when connection goes away
             void destroy();                  // Called when mgmt deletes this link
             void ioThreadProcessing();       // Called on connection's IO thread by request
-            void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
 
         public:
             typedef boost::shared_ptr<Link> shared_ptr;
@@ -106,6 +103,16 @@
             void add(Bridge::shared_ptr);
             void cancel(Bridge::shared_ptr);
 
+            void established();              // Called when connection is created
+            void closed(int, std::string);   // Called when connection goes away
+            void setConnection(Connection*); // Set pointer to the AMQP Connection
+
+            string getAuthMechanism() { return authMechanism; }
+            string getUsername()      { return username; }
+            string getPassword()      { return password; }
+
+            void notifyConnectionForced(const std::string text);
+            
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;
             uint64_t getPersistenceId() const { return persistenceId; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Wed May 21 14:40:49 2008
@@ -27,7 +27,7 @@
 using std::stringstream;
 using boost::intrusive_ptr;
 
-#define LINK_MAINT_INTERVAL 5
+#define LINK_MAINT_INTERVAL 2
 
 LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
 {
@@ -185,3 +185,56 @@
     return store;
 }
 
+void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+{
+    Mutex::ScopedLock locker(lock);
+    LinkMap::iterator l = links.find(key);
+    if (l != links.end())
+    {
+        l->second->established();
+        l->second->setConnection(c);
+    }
+}
+
+void LinkRegistry::notifyClosed(const std::string& key)
+{
+    Mutex::ScopedLock locker(lock);
+    LinkMap::iterator l = links.find(key);
+    if (l != links.end())
+        l->second->closed(0, "Closed by peer");
+}
+
+void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text)
+{
+    Mutex::ScopedLock locker(lock);
+    LinkMap::iterator l = links.find(key);
+    if (l != links.end())
+        l->second->notifyConnectionForced(text);
+}
+
+std::string LinkRegistry::getAuthMechanism(const std::string& key)
+{
+    Mutex::ScopedLock locker(lock);
+    LinkMap::iterator l = links.find(key);
+    if (l != links.end())
+        return l->second->getAuthMechanism();
+    return string("ANONYMOUS");
+}
+
+std::string LinkRegistry::getAuthCredentials(const std::string& key)
+{
+    Mutex::ScopedLock locker(lock);
+    LinkMap::iterator l = links.find(key);
+    if (l == links.end())
+        return string();
+
+    string result;
+    result += '\0';
+    result += l->second->getUsername();
+    result += '\0';
+    result += l->second->getPassword();
+
+    return result;
+}
+
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Wed May 21 14:40:49 2008
@@ -34,6 +34,7 @@
 namespace broker {
 
     class Broker;
+    class Connection;
     class LinkRegistry {
 
         // Declare a timer task to manage the establishment of link connections and the
@@ -106,6 +107,12 @@
          * Return the message store used.
          */
         MessageStore* getStore() const;
+
+        void notifyConnection (const std::string& key, Connection* c);
+        void notifyClosed     (const std::string& key);
+        void notifyConnectionForced    (const std::string& key, const std::string& text);
+        std::string getAuthMechanism   (const std::string& key);
+        std::string getAuthCredentials (const std::string& key);
     };
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Wed May 21 14:40:49 2008
@@ -36,14 +36,13 @@
     { delete [] bytes;}
 };
 
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) :
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
     readError(false),
-    isClient(false),
-    access(a)
+    isClient(false)
 {}
 
 AsynchIOHandler::~AsynchIOHandler() {
@@ -153,7 +152,7 @@
 
 void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
-        codec = factory->create(*this, identifier, access);
+        codec = factory->create(*this, identifier);
         write(framing::ProtocolInitiation(codec->getVersion()));
         return;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Wed May 21 14:40:49 2008
@@ -32,7 +32,6 @@
 }
 
 namespace sys {
-class ProtocolAccess;
 class AsynchIOHandler : public OutputControl {
     std::string identifier;
     AsynchIO* aio;
@@ -40,12 +39,11 @@
     ConnectionCodec* codec;
     bool readError;
     bool isClient;
-    ProtocolAccess* access;
 
     void write(const framing::ProtocolInitiation&);
 
   public:
-    AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0);
+    AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
     ~AsynchIOHandler();
     void init(AsynchIO* a, int numBuffs);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h Wed May 21 14:40:49 2008
@@ -29,7 +29,6 @@
 namespace qpid {
 
 namespace sys {
-class ProtocolAccess;
 
 /**
  * Interface of coder/decoder for a connection of a specific protocol
@@ -69,7 +68,7 @@
 
         /** Return "preferred" codec for outbound connections. */
         virtual ConnectionCodec* create(
-            OutputControl&, const std::string& id, ProtocolAccess* a = 0
+            OutputControl&, const std::string& id
         ) = 0;
     };
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h Wed May 21 14:40:49 2008
@@ -25,7 +25,7 @@
 #include <stdint.h>
 #include "qpid/SharedObject.h"
 #include "ConnectionCodec.h"
-#include "ProtocolAccess.h"
+#include <boost/function.hpp>
 
 namespace qpid {
 namespace sys {
@@ -43,7 +43,7 @@
         boost::shared_ptr<Poller>,
         const std::string& host, int16_t port,
         ConnectionCodec::Factory* codec,
-        ProtocolAccess* access = 0) = 0;
+        boost::function2<void, int, std::string> failed) = 0;
 };
 
 inline ProtocolFactory::~ProtocolFactory() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Wed May 21 14:40:49 2008
@@ -118,6 +118,7 @@
 
 private:
     Socket(IOHandlePrivate*);
+    mutable std::string connectname;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Wed May 21 14:40:49 2008
@@ -42,14 +42,15 @@
     AsynchIOProtocolFactory(int16_t port, int backlog);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, int16_t port,
-                 ConnectionCodec::Factory*, ProtocolAccess*);
+                 ConnectionCodec::Factory*,
+                 boost::function2<void, int, std::string> failed);
 
     uint16_t getPort() const;
     std::string getHost() const;
 
   private:
     void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
-                     bool isClient, ProtocolAccess*);
+                     bool isClient);
 };
 
 // Static instance to initialise plugin
@@ -74,31 +75,18 @@
 {}
 
 void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
-                                          ConnectionCodec::Factory* f, bool isClient,
-                                          ProtocolAccess* a) {
-    AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a);
-    AsynchIO* aio;
+                                          ConnectionCodec::Factory* f, bool isClient) {
+    AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
 
     if (isClient)
         async->setClient();
-    if (a == 0)
-        aio = new AsynchIO(s,
-                           boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-                           boost::bind(&AsynchIOHandler::eof, async, _1),
-                           boost::bind(&AsynchIOHandler::disconnect, async, _1),
-                           boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-                           boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-                           boost::bind(&AsynchIOHandler::idle, async, _1));
-    else {
-        aio = new AsynchIO(s,
-                           boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-                           boost::bind(&ProtocolAccess::closedEof, a, async),
-                           boost::bind(&AsynchIOHandler::disconnect, async, _1),
-                           boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-                           boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-                           boost::bind(&AsynchIOHandler::idle, async, _1));
-        a->setAio(aio);
-    }
+    AsynchIO* aio = new AsynchIO(s,
+                                 boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::eof, async, _1),
+                                 boost::bind(&AsynchIOHandler::disconnect, async, _1),
+                                 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+                                 boost::bind(&AsynchIOHandler::idle, async, _1));
 
     async->init(aio, 4);
     aio->start(poller);
@@ -116,8 +104,7 @@
                                      ConnectionCodec::Factory* fact) {
     acceptor.reset(
         new AsynchAcceptor(listener,
-                           boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false,
-                                       (ProtocolAccess*) 0)));
+                           boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
     acceptor->start(poller);
 }
 
@@ -125,7 +112,7 @@
     Poller::shared_ptr poller,
     const std::string& host, int16_t port,
     ConnectionCodec::Factory* fact,
-    ProtocolAccess* access)
+    boost::function2<void, int, std::string> failed)
 {
     // Note that the following logic does not cause a memory leak.
     // The allocated Socket is freed either by the AsynchConnector
@@ -135,8 +122,8 @@
 
     Socket* socket = new Socket();
     new AsynchConnector (*socket, poller, host, port,
-                         boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access),
-                         boost::bind(&ProtocolAccess::closed, access, _1, _2));
+                         boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true),
+                         failed);
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Wed May 21 14:40:49 2008
@@ -32,6 +32,7 @@
 #include <netdb.h>
 #include <cstdlib>
 #include <string.h>
+#include <iostream>
 
 #include <boost/format.hpp>
 
@@ -138,6 +139,10 @@
 
 void Socket::connect(const std::string& host, int port) const
 {
+    std::stringstream namestream;
+    namestream << host << ":" << port;
+    connectname = namestream.str();
+
     const int& socket = impl->fd;
     struct sockaddr_in name;
     name.sin_family = AF_INET;
@@ -240,6 +245,8 @@
 
 std::string Socket::getPeerAddress() const
 {
+    if (!connectname.empty())
+        return std::string (connectname);
     return getName(impl->fd, false, true);
 }
 

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=658886&r1=658885&r2=658886&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Wed May 21 14:40:49 2008
@@ -112,9 +112,12 @@
         connectArgs["port"]          = self.src.port
         connectArgs["useSsl"]        = False
         connectArgs["durable"]       = _durable
-        connectArgs["authMechanism"] = "PLAIN"
-        connectArgs["username"]      = self.src.username
-        connectArgs["password"]      = self.src.password
+        if self.src.username == "anonymous":
+            connectArgs["authMechanism"] = "ANONYMOUS"
+        else:
+            connectArgs["authMechanism"] = "PLAIN"
+            connectArgs["username"]      = self.src.username
+            connectArgs["password"]      = self.src.password
         res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
         if _verbose:
             print "Connect method returned:", res.status, res.statusText
@@ -164,9 +167,12 @@
             connectArgs["port"]          = self.src.port
             connectArgs["useSsl"]        = False
             connectArgs["durable"]       = _durable
-            connectArgs["authMechanism"] = "PLAIN"
-            connectArgs["username"]      = self.src.username
-            connectArgs["password"]      = self.src.password
+            if self.src.username == "anonymous":
+                connectArgs["authMechanism"] = "ANONYMOUS"
+            else:
+                connectArgs["authMechanism"] = "PLAIN"
+                connectArgs["username"]      = self.src.username
+                connectArgs["password"]      = self.src.password
             res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
             if _verbose:
                 print "Connect method returned:", res.status, res.statusText