You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/11 06:45:27 UTC
svn commit: r694113 - in /incubator/qpid/trunk/qpid/cpp: etc/ src/
src/qpid/client/ src/tests/
Author: astitcher
Date: Wed Sep 10 21:45:26 2008
New Revision: 694113
URL: http://svn.apache.org/viewvc?rev=694113&view=rev
Log:
Refactored c++ client library to allow multiple protocols to be
used simultaneously:
- Added in capability for client library plugins:
Client library will load in plugin modules from
the client library module directory on library load.
- Add protocol option into the standard client command line options
- Split plugin module load area into daemon and client;
default daemon module directory is now <libdir>/qpid/daemon,
default client module directory is <libdir>/qpid/client.
- Changed names of plugins to leave out libqpid prefix
Added:
incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/etc/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/acl.mk
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: incubator/qpid/trunk/qpid/cpp/etc/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/etc/Makefile.am?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/etc/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/etc/Makefile.am Wed Sep 10 21:45:26 2008
@@ -4,6 +4,10 @@
$(SASL_CONF) \
qpidd qpidd.conf
+confdir=$(sysconfdir)/qpid
+nobase_conf_DATA=\
+ qpidc.conf
+
nobase_sysconf_DATA = \
qpidd.conf
Added: incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf?rev=694113&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf (added)
+++ incubator/qpid/trunk/qpid/cpp/etc/qpidc.conf Wed Sep 10 21:45:26 2008
@@ -0,0 +1,2 @@
+# Configuration file for the qpid client library. Entries are of the form:
+# name = value
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 10 21:45:26 2008
@@ -51,12 +51,13 @@
DISTCLEANFILES=qpid/framing/MaxMethodBodySize.h
## Compiler flags
-
AM_CXXFLAGS = $(WARNING_CFLAGS)
AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
INCLUDES = -Igen -I$(srcdir)/gen
## Automake macros to build libraries and executables.
+qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(dmoduledir)\" -DCONF_FILE=\"$(sysconfdir)/qpidd.conf\"
+libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DMODULE_DIR=\"$(cmoduledir)\" -DCONF_FILE=\"$(confdir)/qpidc.conf\"
qpidd_LDADD = \
libqpidbroker.la \
@@ -95,7 +96,15 @@
platform_src = $(posix_plat_src) $(poller)
platform_hdr = $(posix_plat_hdr)
-lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la
+lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la
+
+# Definitions for client and daemon plugins
+PLUGINLDFLAGS=-no-undefined -module -avoid-version
+confdir=$(sysconfdir)/qpid
+dmoduledir=$(libdir)/qpid/daemon
+cmoduledir=$(libdir)/qpid/client
+dmodule_LTLIBRARIES =
+cmodule_LTLIBRARIES =
include cluster.mk
include acl.mk
@@ -113,33 +122,35 @@
if RDMA
# RDMA (Infiniband) protocol code
-libqpidrdma_la_SOURCES = \
+librdmawrap_la_SOURCES = \
qpid/sys/rdma/rdma_exception.h \
qpid/sys/rdma/rdma_factories.cpp \
qpid/sys/rdma/RdmaIO.cpp \
qpid/sys/rdma/RdmaIO.h \
qpid/sys/rdma/rdma_wrap.h
-libqpidrdma_la_LIBADD = \
+librdmawrap_la_LIBADD = \
+ libqpidcommon.la \
-lrdmacm \
-libverbs
-libqpidrdma_la_CXXFLAGS = \
+librdmawrap_la_CXXFLAGS = \
$(AM_CXXFLAGS) -Wno-missing-field-initializers
-noinst_LTLIBRARIES += \
- libqpidrdma.la
-qpidd_LDADD += \
- libqpidrdma.la
+lib_LTLIBRARIES += \
+ librdmawrap.la
+librdmawrap_la_LDFLAGS = \
+ -no-undefined
+# RDMA test/sample programs
noinst_PROGRAMS += RdmaServer RdmaClient
RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
RdmaServer_CXXFLAGS = \
$(AM_CXXFLAGS) -Wno-missing-field-initializers
RdmaServer_LDADD = \
- libqpidrdma.la libqpidcommon.la
+ librdmawrap.la libqpidcommon.la
RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp
RdmaClient_CXXFLAGS = \
$(AM_CXXFLAGS) -Wno-missing-field-initializers
RdmaClient_LDADD = \
- libqpidrdma.la libqpidcommon.la
+ librdmawrap.la libqpidcommon.la
endif
@@ -329,16 +340,17 @@
$(rgen_client_srcs) \
qpid/client/AckPolicy.cpp \
qpid/client/Bounds.cpp \
- qpid/client/ConnectionImpl.cpp \
- qpid/client/Connector.cpp \
qpid/client/Connection.cpp \
qpid/client/ConnectionHandler.cpp \
+ qpid/client/ConnectionImpl.cpp \
qpid/client/ConnectionSettings.cpp \
+ qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResult.cpp \
+ qpid/client/LoadPlugins.cpp \
qpid/client/LocalQueue.cpp \
qpid/client/Message.cpp \
qpid/client/MessageListener.cpp \
@@ -579,7 +591,6 @@
nobase_include_HEADERS += qpid/broker/XmlExchange.h
endif
-
# Force build of qpidd during dist phase so help2man will work.
dist-hook: $(BUILT_SOURCES)
$(MAKE) qpidd
Modified: incubator/qpid/trunk/qpid/cpp/src/acl.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/acl.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/acl.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/acl.mk Wed Sep 10 21:45:26 2008
@@ -1,9 +1,9 @@
#
# acl library makefile fragment, to be included in Makefile.am
#
-lib_LTLIBRARIES += libqpidacl.la
+dmodule_LTLIBRARIES += acl.la
-libqpidacl_la_SOURCES = \
+acl_la_SOURCES = \
qpid/acl/Acl.cpp \
qpid/acl/Acl.h \
qpid/acl/AclData.cpp \
@@ -12,6 +12,5 @@
qpid/acl/AclReader.cpp \
qpid/acl/AclReader.h
-libqpidacl_la_LIBADD = libqpidbroker.la
-
-
+acl_la_LIBADD= libqpidbroker.la
+acl_la_LDFLAGS = $(PLUGINLDFLAGS)
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Wed Sep 10 21:45:26 2008
@@ -1,11 +1,11 @@
#
# Cluster library makefile fragment, to be included in Makefile.am
#
-lib_LTLIBRARIES += libqpidcluster.la
+dmodule_LTLIBRARIES += cluster.la
if CPG
-libqpidcluster_la_SOURCES = \
+cluster_la_SOURCES = \
qpid/cluster/types.h \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
@@ -31,10 +31,12 @@
qpid/cluster/DumpClient.h \
qpid/cluster/DumpClient.cpp
-libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
+cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
else
# Empty stub library to satisfy rpm spec file.
-libqpidcluster_la_SOURCES =
+cluster_la_SOURCES =
endif
+
+cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Wed Sep 10 21:45:26 2008
@@ -105,7 +105,7 @@
throw Exception(QPID_MSG("Connection::open() was already called"));
impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
- impl->open(settings.host, settings.port);
+ impl->open();
max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Sep 10 21:45:26 2008
@@ -40,7 +40,6 @@
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- connector(new Connector(v, settings, this)),
version(v)
{
QPID_LOG(debug, "ConnectionImpl created for " << version);
@@ -48,8 +47,6 @@
handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
CLOSE_CODE_NORMAL, std::string());
- connector->setInputHandler(&handler);
- connector->setShutdownHandler(this);
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
@@ -59,7 +56,7 @@
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- connector->close();
+ if (connector) connector->close();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -93,9 +90,16 @@
}
-void ConnectionImpl::open(const std::string& host, int port)
+void ConnectionImpl::open()
{
- QPID_LOG(info, "Connecting to " << host << ":" << port);
+ const std::string& protocol = handler.protocol;
+ const std::string& host = handler.host;
+ int port = handler.port;
+ QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
+
+ connector.reset(Connector::create(protocol, version, handler, this));
+ connector->setInputHandler(&handler);
+ connector->setShutdownHandler(this);
connector->connect(host, port);
connector->init();
handler.waitForOpen();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Wed Sep 10 21:45:26 2008
@@ -69,7 +69,7 @@
ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
~ConnectionImpl();
- void open(const std::string& host, int port);
+ void open();
bool isOpen() const;
void addSession(const boost::shared_ptr<SessionImpl>&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Wed Sep 10 21:45:26 2008
@@ -28,6 +28,7 @@
namespace client {
ConnectionSettings::ConnectionSettings() :
+ protocol("tcp"),
host("localhost"),
port(TcpAddress::DEFAULT_PORT),
username("guest"),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.h Wed Sep 10 21:45:26 2008
@@ -52,6 +52,11 @@
virtual void configureSocket(qpid::sys::Socket&) const;
/**
+ * The protocol used for the connection (defaults to 'tcp')
+ */
+ std::string protocol;
+
+ /**
* The host (or ip address) to connect to (defaults to 'localhost').
*/
std::string host;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Wed Sep 10 21:45:26 2008
@@ -32,6 +32,7 @@
#include "qpid/Msg.h"
#include <iostream>
+#include <map>
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -43,7 +44,135 @@
using boost::format;
using boost::str;
-Connector::Connector(ProtocolVersion ver,
+// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
+namespace {
+ typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
+
+ ProtocolRegistry& theProtocolRegistry() {
+ static ProtocolRegistry protocolRegistry;
+
+ return protocolRegistry;
+ }
+}
+
+Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+{
+ ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+ if (i==theProtocolRegistry().end()) {
+ throw Exception(QPID_MSG("Unknown protocol: " << proto));
+ }
+ return (i->second)(v, s, c);
+}
+
+void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
+{
+ ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+ if (i!=theProtocolRegistry().end()) {
+ QPID_LOG(error, "Tried to register protocol: " << proto << " more than once");
+ }
+ theProtocolRegistry()[proto] = connectorFactory;
+}
+
+class TCPConnector : public Connector, private sys::Runnable
+{
+ struct Buff;
+
+ /** Batch up frames for writing to aio. */
+ class Writer : public framing::FrameHandler {
+ typedef sys::AsynchIOBufferBase BufferBase;
+ typedef std::vector<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ sys::AsynchIO* aio;
+ BufferBase* buffer;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ framing::Buffer encode;
+ size_t framesEncoded;
+ std::string identifier;
+ Bounds* bounds;
+
+ void writeOne();
+ void newBuffer();
+
+ public:
+
+ Writer(uint16_t maxFrameSize, Bounds*);
+ ~Writer();
+ void init(std::string id, sys::AsynchIO*);
+ void handle(framing::AMQFrame&);
+ void write(sys::AsynchIO&);
+ };
+
+ const uint16_t maxFrameSize;
+ framing::ProtocolVersion version;
+ bool initiated;
+
+ sys::Mutex closedLock;
+ bool closed;
+ bool joined;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ Writer writer;
+
+ sys::Thread receiver;
+
+ sys::Socket socket;
+
+ sys::AsynchIO* aio;
+ boost::shared_ptr<sys::Poller> poller;
+
+ ~TCPConnector();
+
+ void run();
+ void handleClosed();
+ bool closeInternal();
+
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void writebuff(qpid::sys::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(qpid::sys::AsynchIO&);
+
+ std::string identifier;
+
+ ConnectionImpl* impl;
+
+ void connect(const std::string& host, int port);
+ void init();
+ void close();
+ void send(framing::AMQFrame& frame);
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+
+public:
+ TCPConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("tcp", &create);
+ };
+ } init;
+}
+
+TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
@@ -51,23 +180,20 @@
initiated(false),
closed(true),
joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
shutdownHandler(0),
writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl)
{
- QPID_LOG(debug, "Connector created for " << version);
+ QPID_LOG(debug, "TCPConnector created for " << version);
settings.configureSocket(socket);
}
-Connector::~Connector() {
+TCPConnector::~TCPConnector() {
close();
}
-void Connector::connect(const std::string& host, int port){
+void TCPConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(closedLock);
assert(closed);
socket.connect(host, port);
@@ -75,16 +201,16 @@
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1),
+ boost::bind(&TCPConnector::readbuff, this, _1, _2),
+ boost::bind(&TCPConnector::eof, this, _1),
+ boost::bind(&TCPConnector::eof, this, _1),
0, // closed
0, // nobuffs
- boost::bind(&Connector::writebuff, this, _1));
+ boost::bind(&TCPConnector::writebuff, this, _1));
writer.init(identifier, aio);
}
-void Connector::init(){
+void TCPConnector::init(){
Mutex::ScopedLock l(closedLock);
assert(joined);
ProtocolInitiation init(version);
@@ -93,7 +219,7 @@
receiver = Thread(this);
}
-bool Connector::closeInternal() {
+bool TCPConnector::closeInternal() {
Mutex::ScopedLock l(closedLock);
bool ret = !closed;
if (!closed) {
@@ -108,49 +234,57 @@
return ret;
}
-void Connector::close() {
+void TCPConnector::close() {
closeInternal();
}
-void Connector::setInputHandler(InputHandler* handler){
+void TCPConnector::setInputHandler(InputHandler* handler){
input = handler;
}
-void Connector::setShutdownHandler(ShutdownHandler* handler){
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
-OutputHandler* Connector::getOutputHandler(){
+OutputHandler* TCPConnector::getOutputHandler() {
return this;
}
-void Connector::send(AMQFrame& frame) {
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const {
+ return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
writer.handle(frame);
}
-void Connector::handleClosed() {
+void TCPConnector::handleClosed() {
if (closeInternal() && shutdownHandler)
shutdownHandler->shutdown();
}
-struct Connector::Buff : public AsynchIO::BufferBase {
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
+TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
-Connector::Writer::~Writer() { delete buffer; }
+TCPConnector::Writer::~Writer() { delete buffer; }
-void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
+void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) {
Mutex::ScopedLock l(lock);
identifier = id;
aio = a;
- newBuffer(l);
+ newBuffer();
}
-void Connector::Writer::handle(framing::AMQFrame& frame) {
+void TCPConnector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
if (frame.getEof()) {//or if we already have a buffers worth
@@ -160,17 +294,17 @@
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
}
-void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+void TCPConnector::Writer::writeOne() {
assert(buffer);
framesEncoded = 0;
buffer->dataStart = 0;
buffer->dataCount = encode.getPosition();
aio->queueWrite(buffer);
- newBuffer(l);
+ newBuffer();
}
-void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+void TCPConnector::Writer::newBuffer() {
buffer = aio->getQueuedBuffer();
if (!buffer) buffer = new Buff(maxFrameSize);
encode = framing::Buffer(buffer->bytes, buffer->byteCount);
@@ -178,14 +312,14 @@
}
// Called in IO thread.
-void Connector::Writer::write(sys::AsynchIO&) {
+void TCPConnector::Writer::write(sys::AsynchIO&) {
Mutex::ScopedLock l(lock);
assert(buffer);
size_t bytesWritten(0);
for (size_t i = 0; i < lastEof; ++i) {
AMQFrame& frame = frames[i];
uint32_t size = frame.size();
- if (size > encode.available()) writeOne(l);
+ if (size > encode.available()) writeOne();
assert(size <= encode.available());
frame.encode(encode);
++framesEncoded;
@@ -194,10 +328,10 @@
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne(l);
+ if (encode.getPosition() > 0) writeOne();
}
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
if (!initiated) {
@@ -226,11 +360,11 @@
}
}
-void Connector::writebuff(AsynchIO& aio_) {
+void TCPConnector::writebuff(AsynchIO& aio_) {
writer.write(aio_);
}
-void Connector::writeDataBlock(const AMQDataBlock& data) {
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -238,13 +372,13 @@
aio->queueWrite(buff);
}
-void Connector::eof(AsynchIO&) {
+void TCPConnector::eof(AsynchIO&) {
handleClosed();
}
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
-void Connector::run(){
+void TCPConnector::run(){
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
assert(protect);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Wed Sep 10 21:45:26 2008
@@ -40,109 +40,31 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
-
-namespace sys {
-class Poller;
-class AsynchIO;
-class AsynchIOBufferBase;
-}
-
namespace client {
-class Bounds;
class ConnectionSettings;
class ConnectionImpl;
///@internal
-class Connector : public framing::OutputHandler,
- private sys::Runnable
-{
- struct Buff;
-
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef sys::AsynchIOBufferBase BufferBase;
- typedef std::vector<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- sys::AsynchIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne(const sys::Mutex::ScopedLock&);
- void newBuffer(const sys::Mutex::ScopedLock&);
-
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, sys::AsynchIO*);
- void handle(framing::AMQFrame&);
- void write(sys::AsynchIO&);
- };
-
- const uint16_t maxFrameSize;
- framing::ProtocolVersion version;
- bool initiated;
-
- sys::Mutex closedLock;
- bool closed;
- bool joined;
-
- sys::AbsTime lastIn;
- sys::AbsTime lastOut;
- sys::Duration timeout;
- sys::Duration idleIn;
- sys::Duration idleOut;
-
- sys::TimeoutHandler* timeoutHandler;
- sys::ShutdownHandler* shutdownHandler;
- framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
-
- Writer writer;
-
- sys::Thread receiver;
-
- sys::Socket socket;
-
- sys::AsynchIO* aio;
- boost::shared_ptr<sys::Poller> poller;
-
- void run();
- void handleClosed();
- bool closeInternal();
-
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
- void writebuff(qpid::sys::AsynchIO&);
- void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(qpid::sys::AsynchIO&);
-
- std::string identifier;
-
- ConnectionImpl* impl;
-
+class Connector : public framing::OutputHandler
+{
public:
- Connector(framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
- ConnectionImpl*);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init();
- virtual void close();
- virtual void setInputHandler(framing::InputHandler* handler);
- virtual void setShutdownHandler(sys::ShutdownHandler* handler);
- virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
- virtual framing::OutputHandler* getOutputHandler();
- virtual void send(framing::AMQFrame& frame);
- const std::string& getIdentifier() const { return identifier; }
+ // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future)
+ typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ static void registerFactory(const std::string& proto, Factory* connectorFactory);
+
+ virtual ~Connector() {};
+ virtual void connect(const std::string& host, int port) = 0;
+ virtual void init() {};
+ virtual void close() = 0;
+ virtual void send(framing::AMQFrame& frame) = 0;
+
+ virtual void setInputHandler(framing::InputHandler* handler) = 0;
+ virtual void setShutdownHandler(sys::ShutdownHandler* handler) = 0;
+ virtual sys::ShutdownHandler* getShutdownHandler() const = 0;
+ virtual framing::OutputHandler* getOutputHandler() = 0;
+ virtual const std::string& getIdentifier() const = 0;
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp?rev=694113&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LoadPlugins.cpp Wed Sep 10 21:45:26 2008
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/log/Statement.h"
+#include "qpid/log/Options.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Shlib.h"
+#include <boost/filesystem/operations.hpp>
+#include <boost/filesystem/path.hpp>
+
+using namespace qpid;
+using namespace qpid::sys;
+using namespace qpid::log;
+using namespace std;
+namespace fs=boost::filesystem;
+
+struct ModuleOptions : public qpid::Options {
+ string loadDir;
+ vector<string> load;
+ bool noLoad;
+ ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false)
+ {
+ addOptions()
+ ("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory")
+ ("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded")
+ ("no-module-dir", optValue(noLoad), "Don't load modules from module directory");
+ }
+};
+
+// TODO: The following is copied from qpidd.cpp - it needs to be common code
+void tryShlib(const char* libname, bool noThrow) {
+ try {
+ Shlib shlib(libname);
+ QPID_LOG (info, "Loaded Module: " << libname);
+ }
+ catch (const exception& e) {
+ if (!noThrow)
+ throw;
+ }
+}
+
+void loadModuleDir (string dirname, bool isDefault)
+{
+ fs::path dirPath (dirname, fs::native);
+
+ if (!fs::exists (dirPath))
+ {
+ if (isDefault)
+ return;
+ throw Exception ("Directory not found: " + dirname);
+ }
+
+ fs::directory_iterator endItr;
+ for (fs::directory_iterator itr (dirPath); itr != endItr; ++itr)
+ {
+ if (!fs::is_directory(*itr) &&
+ itr->string().find (".so") == itr->string().length() - 3)
+ tryShlib (itr->string().data(), true);
+ }
+}
+
+struct LoadtimeInitialise {
+ LoadtimeInitialise() {
+ ModuleOptions moduleOptions;
+ string defaultPath (moduleOptions.loadDir);
+ moduleOptions.parse (0, 0, CONF_FILE, true);
+
+ for (vector<string>::iterator iter = moduleOptions.load.begin();
+ iter != moduleOptions.load.end();
+ iter++)
+ tryShlib (iter->data(), false);
+
+ if (!moduleOptions.noLoad) {
+ bool isDefault = defaultPath == moduleOptions.loadDir;
+ loadModuleDir (moduleOptions.loadDir, isDefault);
+ }
+ }
+} init;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Wed Sep 10 21:45:26 2008
@@ -34,7 +34,6 @@
#include <fstream>
#include <signal.h>
#include <unistd.h>
-#include <sys/utsname.h>
using namespace qpid;
using namespace qpid::broker;
@@ -47,14 +46,8 @@
string loadDir;
vector<string> load;
bool noLoad;
- ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false)
+ ModuleOptions() : qpid::Options("Module options"), loadDir(MODULE_DIR), noLoad(false)
{
- struct utsname _uname;
- if (::uname(&_uname) == 0) {
- if (string(_uname.machine) == "x86_64")
- loadDir = "/usr/lib64/qpidd";
- }
-
addOptions()
("module-dir", optValue(loadDir, "DIR"), "Load all .so modules in this directory")
("load-module", optValue(load, "FILE"), "Specifies additional module(s) to be loaded")
@@ -97,7 +90,7 @@
DaemonOptions daemon;
qpid::log::Options log;
- QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) {
+ QpiddOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) {
add(common);
add(module);
add(broker);
@@ -121,7 +114,7 @@
ModuleOptions module;
qpid::log::Options log;
- BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", "/etc/qpidd.conf"), log(argv0) {
+ BootstrapOptions(const char* argv0) : qpid::Options("Options"), common("", CONF_FILE), log(argv0) {
add(common);
add(module);
add(log);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ConnectionOptions.h Wed Sep 10 21:45:26 2008
@@ -37,6 +37,7 @@
addOptions()
("broker,b", optValue(host, "HOST"), "Broker host to connect to")
("port,p", optValue(port, "PORT"), "Broker port to connect to")
+ ("protocol,P", optValue(protocol, "tcp|rdma"), "Protocol to use for broker connection")
("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
("username", optValue(username, "USER"), "user name for broker log in.")
("password", optValue(password, "PASSWORD"), "password for broker log in.")
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=694113&r1=694112&r2=694113&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Wed Sep 10 21:45:26 2008
@@ -3,7 +3,7 @@
# Cluster tests makefile fragment, to be included in Makefile.am
#
-lib_cluster = $(abs_builddir)/../libqpidcluster.la
+lib_cluster = $(abs_builddir)/../cluster.la
# NOTE: Programs using the openais library must be run with gid=ais
# You should do "newgrp ais" before running the tests to run these.