You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/11 06:45:27 UTC

svn commit: r694113 - in /incubator/qpid/trunk/qpid/cpp: etc/ src/ src/qpid/client/ src/tests/

Author: astitcher
Date: Wed Sep 10 21:45:26 2008
New Revision: 694113

URL: http://svn.apache.org/viewvc?rev=694113&view=rev
Log:
Refactored c++ client library to allow multiple protocols to be
used simultaneously:
- Added in capability for client library plugins:
  Client library will load in plugin modules from
  the client library module directory on library load.
- Add protocol option into the standard client command line options
- Split plugin module load area into daemon and client;
  default daemon module directory is now <libdir>/qpid/daemon,
  default client module directory is <libdir>/qpid/client.
- Changed names of plugins to leave out libqpid prefix

Added:
    incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/etc/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/acl.mk
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/etc/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/etc/Makefile.am?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/etc/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/etc/Makefile.am Wed Sep 10 21:45:26 2008
@@ -4,6 +4,10 @@
 	$(SASL_CONF) \
 	qpidd qpidd.conf
 
+confdir=$(sysconfdir)/qpid
+nobase_conf_DATA=\
+	qpidc.conf
+
 nobase_sysconf_DATA = \
 	qpidd.conf
 

Added: incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf?rev=694113&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf (added)
+++ incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf Wed Sep 10 21:45:26 2008
@@ -0,0 +1,2 @@
+# Configuration file for the qpid client library. Entries are of the form:
+#  name = value

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 10 21:45:26 2008
@@ -51,12 +51,13 @@
 DISTCLEANFILES=qpid/framing/MaxMethodBodySize.h
 
 ## Compiler flags
-
 AM_CXXFLAGS = $(WARNING_CFLAGS)
 AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
 INCLUDES = -Igen -I$(srcdir)/gen
 
 ## Automake macros to build libraries and executables.
+qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(dmoduledir)\" -DCONF_FILE=\"$(sysconfdir)/qpidd.conf\"
+libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(cmoduledir)\" -DCONF_FILE=\"$(confdir)/qpidc.conf\"
 
 qpidd_LDADD =					\
   libqpidbroker.la				\
@@ -95,7 +96,15 @@
 platform_src = $(posix_plat_src) $(poller)
 platform_hdr = $(posix_plat_hdr)
 
-lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la 
+lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la
+
+# Definitions for client and daemon plugins
+PLUGINLDFLAGS=-no-undefined -module -avoid-version
+confdir=$(sysconfdir)/qpid
+dmoduledir=$(libdir)/qpid/daemon
+cmoduledir=$(libdir)/qpid/client
+dmodule_LTLIBRARIES =
+cmodule_LTLIBRARIES = 
 
 include cluster.mk
 include acl.mk
@@ -113,33 +122,35 @@
 if RDMA
 
 # RDMA (Infiniband) protocol code
-libqpidrdma_la_SOURCES = \
+librdmawrap_la_SOURCES = \
   qpid/sys/rdma/rdma_exception.h \
   qpid/sys/rdma/rdma_factories.cpp \
   qpid/sys/rdma/RdmaIO.cpp \
   qpid/sys/rdma/RdmaIO.h \
   qpid/sys/rdma/rdma_wrap.h
-libqpidrdma_la_LIBADD = \
+librdmawrap_la_LIBADD = \
+  libqpidcommon.la \
   -lrdmacm \
   -libverbs
-libqpidrdma_la_CXXFLAGS = \
+librdmawrap_la_CXXFLAGS = \
   $(AM_CXXFLAGS) -Wno-missing-field-initializers
-noinst_LTLIBRARIES += \
-  libqpidrdma.la
-qpidd_LDADD += \
-  libqpidrdma.la
+lib_LTLIBRARIES += \
+  librdmawrap.la
+librdmawrap_la_LDFLAGS = \
+  -no-undefined
 
+# RDMA test/sample programs
 noinst_PROGRAMS += RdmaServer RdmaClient
 RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
 RdmaServer_CXXFLAGS = \
   $(AM_CXXFLAGS) -Wno-missing-field-initializers
 RdmaServer_LDADD = \
-  libqpidrdma.la libqpidcommon.la
+  librdmawrap.la libqpidcommon.la
 RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp
 RdmaClient_CXXFLAGS = \
   $(AM_CXXFLAGS) -Wno-missing-field-initializers
 RdmaClient_LDADD = \
-  libqpidrdma.la libqpidcommon.la
+  librdmawrap.la libqpidcommon.la
 
 endif
 
@@ -329,16 +340,17 @@
   $(rgen_client_srcs)				\
   qpid/client/AckPolicy.cpp			\
   qpid/client/Bounds.cpp			\
-  qpid/client/ConnectionImpl.cpp		\
-  qpid/client/Connector.cpp			\
   qpid/client/Connection.cpp			\
   qpid/client/ConnectionHandler.cpp		\
+  qpid/client/ConnectionImpl.cpp \
   qpid/client/ConnectionSettings.cpp		\
+  qpid/client/Connector.cpp	\
   qpid/client/Demux.cpp				\
   qpid/client/Dispatcher.cpp			\
   qpid/client/Future.cpp			\
   qpid/client/FutureCompletion.cpp		\
   qpid/client/FutureResult.cpp			\
+  qpid/client/LoadPlugins.cpp			\
   qpid/client/LocalQueue.cpp			\
   qpid/client/Message.cpp			\
   qpid/client/MessageListener.cpp		\
@@ -579,7 +591,6 @@
 nobase_include_HEADERS += qpid/broker/XmlExchange.h
 endif
 
-
 # Force build of qpidd during dist phase so help2man will work.
 dist-hook: $(BUILT_SOURCES)
 	$(MAKE) qpidd

Modified: incubator/qpid/trunk/qpid/cpp/src/acl.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/acl.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/acl.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/acl.mk Wed Sep 10 21:45:26 2008
@@ -1,9 +1,9 @@
 #
 # acl library makefile fragment, to be included in Makefile.am
 # 
-lib_LTLIBRARIES += libqpidacl.la
+dmodule_LTLIBRARIES += acl.la
 
-libqpidacl_la_SOURCES = \
+acl_la_SOURCES = \
   qpid/acl/Acl.cpp \
   qpid/acl/Acl.h \
   qpid/acl/AclData.cpp \
@@ -12,6 +12,5 @@
   qpid/acl/AclReader.cpp \
   qpid/acl/AclReader.h
 
-libqpidacl_la_LIBADD = libqpidbroker.la
-
-
+acl_la_LIBADD= libqpidbroker.la
+acl_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Wed Sep 10 21:45:26 2008
@@ -1,11 +1,11 @@
 #
 # Cluster library makefile fragment, to be included in Makefile.am
 # 
-lib_LTLIBRARIES += libqpidcluster.la
+dmodule_LTLIBRARIES += cluster.la
 
 if CPG
 
-libqpidcluster_la_SOURCES = \
+cluster_la_SOURCES = \
   qpid/cluster/types.h \
   qpid/cluster/Cluster.cpp \
   qpid/cluster/Cluster.h \
@@ -31,10 +31,12 @@
   qpid/cluster/DumpClient.h \
   qpid/cluster/DumpClient.cpp
 
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
+cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
 
 else
 # Empty stub library to satisfy rpm spec file.
-libqpidcluster_la_SOURCES = 
+cluster_la_SOURCES = 
 
 endif
+
+cluster_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Wed Sep 10 21:45:26 2008
@@ -105,7 +105,7 @@
         throw Exception(QPID_MSG("Connection::open() was already called"));
 
     impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
-    impl->open(settings.host, settings.port);
+    impl->open();
     max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Sep 10 21:45:26 2008
@@ -40,7 +40,6 @@
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
-      connector(new Connector(v, settings, this)), 
       version(v)
 {
     QPID_LOG(debug, "ConnectionImpl created for " << version);
@@ -48,8 +47,6 @@
     handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
     handler.onClose = boost::bind(&ConnectionImpl::closed, this,
                                   CLOSE_CODE_NORMAL, std::string());
-    connector->setInputHandler(&handler);
-    connector->setShutdownHandler(this);
 
     //only set error handler once  open
     handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
@@ -59,7 +56,7 @@
     // Important to close the connector first, to ensure the
     // connector thread does not call on us while the destructor
     // is running.
-    connector->close(); 
+    if (connector) connector->close(); 
 }
 
 void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -93,9 +90,16 @@
 }
 
 
-void ConnectionImpl::open(const std::string& host, int port)
+void ConnectionImpl::open()
 {
-    QPID_LOG(info, "Connecting to " << host << ":" << port);
+    const std::string& protocol = handler.protocol;
+    const std::string& host = handler.host;
+    int port = handler.port;
+    QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
+
+    connector.reset(Connector::create(protocol, version, handler, this)); 
+    connector->setInputHandler(&handler);
+    connector->setShutdownHandler(this);
     connector->connect(host, port);
     connector->init();
     handler.waitForOpen();    

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Wed Sep 10 21:45:26 2008
@@ -69,7 +69,7 @@
     ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
     ~ConnectionImpl();
     
-    void open(const std::string& host, int port);
+    void open();
     bool isOpen() const;
 
     void addSession(const boost::shared_ptr<SessionImpl>&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Wed Sep 10 21:45:26 2008
@@ -28,6 +28,7 @@
 namespace client {
 
 ConnectionSettings::ConnectionSettings() :
+    protocol("tcp"),
     host("localhost"), 
     port(TcpAddress::DEFAULT_PORT),
     username("guest"), 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Wed Sep 10 21:45:26 2008
@@ -52,6 +52,11 @@
     virtual void configureSocket(qpid::sys::Socket&) const;
 
     /**
+     * The protocol used for the connection (defaults to 'tcp')
+     */
+    std::string protocol;
+
+    /**
      * The host (or ip address) to connect to (defaults to 'localhost').
      */
     std::string host;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Wed Sep 10 21:45:26 2008
@@ -32,6 +32,7 @@
 #include "qpid/Msg.h"
 
 #include <iostream>
+#include <map>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -43,7 +44,135 @@
 using boost::format;
 using boost::str;
 
-Connector::Connector(ProtocolVersion ver,
+// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
+namespace {
+    typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
+    
+    ProtocolRegistry& theProtocolRegistry() {
+        static ProtocolRegistry protocolRegistry;
+        
+        return protocolRegistry;
+    } 
+}
+
+Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+{
+    ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+    if (i==theProtocolRegistry().end()) {
+        throw Exception(QPID_MSG("Unknown protocol: " << proto));
+    }
+    return (i->second)(v, s, c);
+}
+
+void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
+{
+    ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+    if (i!=theProtocolRegistry().end()) {
+        QPID_LOG(error, "Tried to register protocol: " << proto << " more than once");
+    }
+    theProtocolRegistry()[proto] = connectorFactory;
+}
+
+class TCPConnector : public Connector, private sys::Runnable
+{
+    struct Buff;
+
+    /** Batch up frames for writing to aio. */
+    class Writer : public framing::FrameHandler {
+        typedef sys::AsynchIOBufferBase BufferBase;
+        typedef std::vector<framing::AMQFrame> Frames;
+
+        const uint16_t maxFrameSize;
+        sys::Mutex lock;
+        sys::AsynchIO* aio;
+        BufferBase* buffer;
+        Frames frames;
+        size_t lastEof; // Position after last EOF in frames
+        framing::Buffer encode;
+        size_t framesEncoded;
+        std::string identifier;
+        Bounds* bounds;        
+        
+        void writeOne();
+        void newBuffer();
+
+      public:
+        
+        Writer(uint16_t maxFrameSize, Bounds*);
+        ~Writer();
+        void init(std::string id, sys::AsynchIO*);
+        void handle(framing::AMQFrame&);
+        void write(sys::AsynchIO&);
+    };
+    
+    const uint16_t maxFrameSize;
+    framing::ProtocolVersion version;
+    bool initiated;
+
+    sys::Mutex closedLock;    
+    bool closed;
+    bool joined;
+
+    sys::ShutdownHandler* shutdownHandler;
+    framing::InputHandler* input;
+    framing::InitiationHandler* initialiser;
+    framing::OutputHandler* output;
+
+    Writer writer;
+    
+    sys::Thread receiver;
+
+    sys::Socket socket;
+
+    sys::AsynchIO* aio;
+    boost::shared_ptr<sys::Poller> poller;
+
+    ~TCPConnector();
+
+    void run();
+    void handleClosed();
+    bool closeInternal();
+    
+    void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+    void writebuff(qpid::sys::AsynchIO&);
+    void writeDataBlock(const framing::AMQDataBlock& data);
+    void eof(qpid::sys::AsynchIO&);
+
+    std::string identifier;
+
+    ConnectionImpl* impl;
+    
+    void connect(const std::string& host, int port);
+    void init();
+    void close();
+    void send(framing::AMQFrame& frame);
+
+    void setInputHandler(framing::InputHandler* handler);
+    void setShutdownHandler(sys::ShutdownHandler* handler);
+    sys::ShutdownHandler* getShutdownHandler() const;
+    framing::OutputHandler* getOutputHandler();
+    const std::string& getIdentifier() const;
+
+public:
+    TCPConnector(framing::ProtocolVersion pVersion,
+              const ConnectionSettings&, 
+              ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+        return new TCPConnector(v, s, c);
+    }
+
+    struct StaticInit {
+        StaticInit() {
+            Connector::registerFactory("tcp", &create);
+        };
+    } init;
+}
+
+TCPConnector::TCPConnector(ProtocolVersion ver,
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
@@ -51,23 +180,20 @@
       initiated(false),
       closed(true),
       joined(true),
-      timeout(0),
-      idleIn(0), idleOut(0), 
-      timeoutHandler(0),
       shutdownHandler(0),
       writer(maxFrameSize, cimpl),
       aio(0),
       impl(cimpl)
 {
-    QPID_LOG(debug, "Connector created for " << version);
+    QPID_LOG(debug, "TCPConnector created for " << version);
     settings.configureSocket(socket);
 }
 
-Connector::~Connector() {
+TCPConnector::~TCPConnector() {
     close();
 }
 
-void Connector::connect(const std::string& host, int port){
+void TCPConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     socket.connect(host, port);
@@ -75,16 +201,16 @@
     closed = false;
     poller = Poller::shared_ptr(new Poller);
     aio = new AsynchIO(socket,
-                       boost::bind(&Connector::readbuff, this, _1, _2),
-                       boost::bind(&Connector::eof, this, _1),
-                       boost::bind(&Connector::eof, this, _1),
+                       boost::bind(&TCPConnector::readbuff, this, _1, _2),
+                       boost::bind(&TCPConnector::eof, this, _1),
+                       boost::bind(&TCPConnector::eof, this, _1),
                        0, // closed
                        0, // nobuffs
-                       boost::bind(&Connector::writebuff, this, _1));
+                       boost::bind(&TCPConnector::writebuff, this, _1));
     writer.init(identifier, aio);
 }
 
-void Connector::init(){
+void TCPConnector::init(){
     Mutex::ScopedLock l(closedLock);
     assert(joined);
     ProtocolInitiation init(version);
@@ -93,7 +219,7 @@
     receiver = Thread(this);
 }
 
-bool Connector::closeInternal() {
+bool TCPConnector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
     bool ret = !closed;
     if (!closed) {
@@ -108,49 +234,57 @@
     return ret;
 }
         
-void Connector::close() {
+void TCPConnector::close() {
     closeInternal();
 }
 
-void Connector::setInputHandler(InputHandler* handler){
+void TCPConnector::setInputHandler(InputHandler* handler){
     input = handler;
 }
 
-void Connector::setShutdownHandler(ShutdownHandler* handler){
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
     shutdownHandler = handler;
 }
 
-OutputHandler* Connector::getOutputHandler(){ 
+OutputHandler* TCPConnector::getOutputHandler() {
     return this; 
 }
 
-void Connector::send(AMQFrame& frame) {
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+    return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const { 
+    return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
     writer.handle(frame);
 }
 
-void Connector::handleClosed() {
+void TCPConnector::handleClosed() {
     if (closeInternal() && shutdownHandler)
         shutdownHandler->shutdown();
 }
 
-struct Connector::Buff : public AsynchIO::BufferBase {
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
     Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}    
     ~Buff() { delete [] bytes;}
 };
 
-Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
+TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
 {
 }
 
-Connector::Writer::~Writer() { delete buffer; }
+TCPConnector::Writer::~Writer() { delete buffer; }
 
-void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
+void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) {
     Mutex::ScopedLock l(lock);
     identifier = id;
     aio = a;
-    newBuffer(l);
+    newBuffer();
 }
-void Connector::Writer::handle(framing::AMQFrame& frame) { 
+void TCPConnector::Writer::handle(framing::AMQFrame& frame) { 
     Mutex::ScopedLock l(lock);
     frames.push_back(frame);
     if (frame.getEof()) {//or if we already have a buffers worth
@@ -160,17 +294,17 @@
     QPID_LOG(trace, "SENT " << identifier << ": " << frame);
 }
 
-void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+void TCPConnector::Writer::writeOne() {
     assert(buffer);
     framesEncoded = 0;
 
     buffer->dataStart = 0;
     buffer->dataCount = encode.getPosition();
     aio->queueWrite(buffer);
-    newBuffer(l);
+    newBuffer();
 }
 
-void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+void TCPConnector::Writer::newBuffer() {
     buffer = aio->getQueuedBuffer();
     if (!buffer) buffer = new Buff(maxFrameSize);
     encode = framing::Buffer(buffer->bytes, buffer->byteCount);
@@ -178,14 +312,14 @@
 }
 
 // Called in IO thread.
-void Connector::Writer::write(sys::AsynchIO&) {
+void TCPConnector::Writer::write(sys::AsynchIO&) {
     Mutex::ScopedLock l(lock);
     assert(buffer);
     size_t bytesWritten(0);
     for (size_t i = 0; i < lastEof; ++i) {
         AMQFrame& frame = frames[i];
         uint32_t size = frame.size();
-        if (size > encode.available()) writeOne(l);
+        if (size > encode.available()) writeOne();
         assert(size <= encode.available());
         frame.encode(encode);
         ++framesEncoded;
@@ -194,10 +328,10 @@
     frames.erase(frames.begin(), frames.begin()+lastEof);
     lastEof = 0;
     if (bounds) bounds->reduce(bytesWritten);
-    if (encode.getPosition() > 0) writeOne(l);
+    if (encode.getPosition() > 0) writeOne();
 }
 
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
     framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
 
     if (!initiated) {
@@ -226,11 +360,11 @@
     }
 }
 
-void Connector::writebuff(AsynchIO& aio_) {
+void TCPConnector::writebuff(AsynchIO& aio_) {
     writer.write(aio_);
 }
 
-void Connector::writeDataBlock(const AMQDataBlock& data) {
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
     AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
@@ -238,13 +372,13 @@
     aio->queueWrite(buff);
 }
 
-void Connector::eof(AsynchIO&) {
+void TCPConnector::eof(AsynchIO&) {
     handleClosed();
 }
 
 // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
 // will never be called
-void Connector::run(){
+void TCPConnector::run(){
     // Keep the connection impl in memory until run() completes.
     boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
     assert(protect);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Wed Sep 10 21:45:26 2008
@@ -40,109 +40,31 @@
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
-
-namespace sys {
-class Poller;
-class AsynchIO;
-class AsynchIOBufferBase;
-}
-	
 namespace client {
 
-class Bounds;
 class ConnectionSettings;
 class ConnectionImpl;
 
 ///@internal
-class Connector : public framing::OutputHandler, 
-                  private sys::Runnable
-{
-    struct Buff;
-
-    /** Batch up frames for writing to aio. */
-    class Writer : public framing::FrameHandler {
-        typedef sys::AsynchIOBufferBase BufferBase;
-        typedef std::vector<framing::AMQFrame> Frames;
-
-        const uint16_t maxFrameSize;
-        sys::Mutex lock;
-        sys::AsynchIO* aio;
-        BufferBase* buffer;
-        Frames frames;
-        size_t lastEof; // Position after last EOF in frames
-        framing::Buffer encode;
-        size_t framesEncoded;
-        std::string identifier;
-        Bounds* bounds;        
-        
-        void writeOne(const sys::Mutex::ScopedLock&);
-        void newBuffer(const sys::Mutex::ScopedLock&);
-
-      public:
-        
-        Writer(uint16_t maxFrameSize, Bounds*);
-        ~Writer();
-        void init(std::string id, sys::AsynchIO*);
-        void handle(framing::AMQFrame&);
-        void write(sys::AsynchIO&);
-    };
-    
-    const uint16_t maxFrameSize;
-    framing::ProtocolVersion version;
-    bool initiated;
-
-    sys::Mutex closedLock;    
-    bool closed;
-    bool joined;
-
-    sys::AbsTime lastIn;
-    sys::AbsTime lastOut;
-    sys::Duration timeout;
-    sys::Duration idleIn;
-    sys::Duration idleOut;
-
-    sys::TimeoutHandler* timeoutHandler;
-    sys::ShutdownHandler* shutdownHandler;
-    framing::InputHandler* input;
-    framing::InitiationHandler* initialiser;
-    framing::OutputHandler* output;
-
-    Writer writer;
-    
-    sys::Thread receiver;
-
-    sys::Socket socket;
-
-    sys::AsynchIO* aio;
-    boost::shared_ptr<sys::Poller> poller;
-
-    void run();
-    void handleClosed();
-    bool closeInternal();
-    
-    void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
-    void writebuff(qpid::sys::AsynchIO&);
-    void writeDataBlock(const framing::AMQDataBlock& data);
-    void eof(qpid::sys::AsynchIO&);
-
-    std::string identifier;
-
-    ConnectionImpl* impl;
-    
+class Connector : public framing::OutputHandler
+{    
   public:
-    Connector(framing::ProtocolVersion pVersion,
-              const ConnectionSettings&, 
-              ConnectionImpl*);
-    virtual ~Connector();
-    virtual void connect(const std::string& host, int port);
-    virtual void init();
-    virtual void close();
-    virtual void setInputHandler(framing::InputHandler* handler);
-    virtual void setShutdownHandler(sys::ShutdownHandler* handler);
-    virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
-    virtual framing::OutputHandler* getOutputHandler();
-    virtual void send(framing::AMQFrame& frame);
-    const std::string& getIdentifier() const { return identifier; }
+    // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future)
+    typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+    static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+    static void registerFactory(const std::string& proto, Factory* connectorFactory);
+
+    virtual ~Connector() {};
+    virtual void connect(const std::string& host, int port) = 0;
+    virtual void init() {};
+    virtual void close() = 0;
+    virtual void send(framing::AMQFrame& frame) = 0;
+
+    virtual void setInputHandler(framing::InputHandler* handler) = 0;
+    virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0;
+    virtual sys::ShutdownHandler* getShutdownHandler() const = 0;
+    virtual framing::OutputHandler* getOutputHandler() = 0;
+    virtual const std::string& getIdentifier() const = 0;
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp?rev=694113&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp Wed Sep 10 21:45:26 2008
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/log/Statement.h"
+#include "qpid/log/Options.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Shlib.h"
+#include <boost/filesystem/operations.hpp>
+#include <boost/filesystem/path.hpp>
+
+using namespace qpid;
+using namespace qpid::sys;
+using namespace qpid::log;
+using namespace std;
+namespace fs=boost::filesystem;
+
+struct ModuleOptions : public qpid::Options {
+    string         loadDir;
+    vector<string> load;
+    bool           noLoad;
+    ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false)
+    {
+        addOptions()
+            ("module-dir",    optValue(loadDir, "DIR"),  "Load all .so modules in this directory")
+            ("load-module",   optValue(load,    "FILE"), "Specifies additional module(s) to be loaded")
+            ("no-module-dir", optValue(noLoad),          "Don't load modules from module directory");
+    }
+};
+
+// TODO: The following is copied from qpidd.cpp - it needs to be common code
+void tryShlib(const char* libname, bool noThrow) {
+    try {
+        Shlib shlib(libname);
+        QPID_LOG (info, "Loaded Module: " << libname);
+    }
+    catch (const exception& e) {
+        if (!noThrow)
+            throw;
+    }
+}
+
+void loadModuleDir (string dirname, bool isDefault)
+{
+    fs::path dirPath (dirname, fs::native);
+
+    if (!fs::exists (dirPath))
+    {
+        if (isDefault)
+            return;
+        throw Exception ("Directory not found: " + dirname);
+    }
+
+    fs::directory_iterator endItr;
+    for (fs::directory_iterator itr (dirPath); itr != endItr; ++itr)
+    {
+        if (!fs::is_directory(*itr) &&
+            itr->string().find (".so") == itr->string().length() - 3)
+            tryShlib (itr->string().data(), true);
+    }
+}
+
+struct LoadtimeInitialise {
+    LoadtimeInitialise() {
+        ModuleOptions moduleOptions;
+        string           defaultPath (moduleOptions.loadDir);
+        moduleOptions.parse (0, 0, CONF_FILE, true);
+    
+        for (vector<string>::iterator iter = moduleOptions.load.begin();
+             iter != moduleOptions.load.end();
+             iter++)
+            tryShlib (iter->data(), false);
+    
+        if (!moduleOptions.noLoad) {
+            bool isDefault = defaultPath == moduleOptions.loadDir;
+            loadModuleDir (moduleOptions.loadDir, isDefault);
+        }
+    }
+} init;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Wed Sep 10 21:45:26 2008
@@ -34,7 +34,6 @@
 #include <fstream>
 #include <signal.h>
 #include <unistd.h>
-#include <sys/utsname.h>
 
 using namespace qpid;
 using namespace qpid::broker;
@@ -47,14 +46,8 @@
     string         loadDir;
     vector<string> load;
     bool           noLoad;
-    ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false)
+    ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false)
     {
-        struct utsname _uname;
-        if (::uname(&_uname) == 0) {
-            if (string(_uname.machine) == "x86_64")
-                loadDir = "/usr/lib64/qpidd";
-        }
-
         addOptions()
             ("module-dir",    optValue(loadDir, "DIR"),  "Load all .so modules in this directory")
             ("load-module",   optValue(load,    "FILE"), "Specifies additional module(s) to be loaded")
@@ -97,7 +90,7 @@
     DaemonOptions daemon;
     qpid::log::Options log;
     
-    QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) {
+    QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) {
         add(common);
         add(module);
         add(broker);
@@ -121,7 +114,7 @@
     ModuleOptions module;
     qpid::log::Options log;     
 
-    BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) {
+    BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) {
         add(common);
         add(module);
         add(log);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h Wed Sep 10 21:45:26 2008
@@ -37,6 +37,7 @@
         addOptions()
             ("broker,b", optValue(host, "HOST"), "Broker host to connect to") 
             ("port,p", optValue(port, "PORT"), "Broker port to connect to")
+            ("protocol,P", optValue(protocol, "tcp|rdma"), "Protocol to use for broker connection")            
             ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
             ("username", optValue(username, "USER"), "user name for broker log in.")
             ("password", optValue(password, "PASSWORD"), "password for broker log in.")

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Wed Sep 10 21:45:26 2008
@@ -3,7 +3,7 @@
 # Cluster tests makefile fragment, to be included in Makefile.am
 # 
 
-lib_cluster = $(abs_builddir)/../libqpidcluster.la
+lib_cluster = $(abs_builddir)/../cluster.la
 
 # NOTE: Programs using the openais library must be run with gid=ais
 # You should do "newgrp ais" before running the tests to run these.