You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/01/21 07:17:29 UTC
svn commit: r901550 - in /qpid/trunk/qpid/cpp/src/qpid/client:
ConnectionImpl.cpp Connector.cpp Connector.h RdmaConnector.cpp
SslConnector.cpp TCPConnector.cpp TCPConnector.h
Author: astitcher
Date: Thu Jan 21 06:17:10 2010
New Revision: 901550
URL: http://svn.apache.org/viewvc?rev=901550&view=rev
Log:
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
- The client threads are initialised (via a singleton) when first
used in a Connection::open()
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Jan 21 06:17:10 2010
@@ -18,7 +18,9 @@
* under the License.
*
*/
+
#include "qpid/client/ConnectionImpl.h"
+
#include "qpid/client/Connector.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SessionImpl.h"
@@ -27,11 +29,20 @@
#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/Options.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
#include <limits>
+#include <vector>
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
namespace qpid {
namespace client {
@@ -41,7 +52,10 @@
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-// Get timer singleton
+namespace {
+// Maybe should amalgamate the singletons into a single client singleton
+
+// Get timer singleton
Timer& theTimer() {
static Mutex timerInitLock;
ScopedLock<Mutex> l(timerInitLock);
@@ -50,6 +64,76 @@
return t;
}
+struct IOThreadOptions : public qpid::Options {
+ int maxIOThreads;
+
+ IOThreadOptions(int c) :
+ Options("IO threading options"),
+ maxIOThreads(c)
+ {
+ addOptions()
+ ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use");
+ }
+};
+
+// IO threads
+class IOThread {
+ int maxIOThreads;
+ int ioThreads;
+ int connections;
+ Mutex threadLock;
+ std::vector<Thread> t;
+ Poller::shared_ptr poller_;
+
+public:
+ void add() {
+ ScopedLock<Mutex> l(threadLock);
+ ++connections;
+ if (ioThreads < maxIOThreads) {
+ QPID_LOG(debug, "Created IO thread: " << ioThreads);
+ ++ioThreads;
+ t.push_back( Thread(poller_.get()) );
+ }
+ }
+
+ void sub() {
+ ScopedLock<Mutex> l(threadLock);
+ --connections;
+ }
+
+ Poller::shared_ptr poller() const {
+ return poller_;
+ }
+
+ // Here is where the maximum number of threads is set
+ IOThread(int c) :
+ ioThreads(0),
+ connections(0),
+ poller_(new Poller)
+ {
+ IOThreadOptions options(c);
+ options.parse(0, 0, QPIDC_CONF_FILE, true);
+ maxIOThreads = (options.maxIOThreads != -1) ?
+ options.maxIOThreads : 1;
+ }
+
+ // We can't destroy threads one-by-one as the only
+ // control we have is to shutdown the whole lot
+ // and we can't do that before we're unloaded as we can't
+ // restart the Poller after shutting it down
+ ~IOThread() {
+ poller_->shutdown();
+ for (int i=0; i<ioThreads; ++i) {
+ t[i].join();
+ }
+ }
+};
+
+IOThread& theIO() {
+ static IOThread io(SystemInfo::concurrency());
+ return io;
+}
+
class HeartbeatTask : public TimerTask {
TimeoutHandler& timeout;
@@ -66,6 +150,8 @@
{}
};
+}
+
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -89,6 +175,7 @@
// connector thread does not call on us while the destructor
// is running.
if (connector) connector->close();
+ theIO().sub();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
@@ -131,11 +218,10 @@
}
bool ConnectionImpl::isOpen() const
-{
+{
return handler.isOpen();
}
-
void ConnectionImpl::open()
{
const std::string& protocol = handler.protocol;
@@ -143,7 +229,8 @@
int port = handler.port;
QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
- connector.reset(Connector::create(protocol, version, handler, this));
+ theIO().add();
+ connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
connector->connect(host, port);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 21 06:17:10 2010
@@ -21,32 +21,17 @@
#include "qpid/client/Connector.h"
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/Codec.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/SecurityLayer.h"
-#include "qpid/Msg.h"
-#include <iostream>
#include <map>
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
namespace qpid {
namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
-using boost::format;
-using boost::str;
-// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
namespace {
typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
@@ -57,13 +42,15 @@
}
}
-Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+Connector* Connector::create(const std::string& proto,
+ boost::shared_ptr<Poller> p,
+ framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
{
ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
if (i==theProtocolRegistry().end()) {
throw Exception(QPID_MSG("Unknown protocol: " << proto));
}
- return (i->second)(v, s, c);
+ return (i->second)(p, v, s, c);
}
void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
@@ -79,4 +66,5 @@
{
}
+
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 21 06:17:10 2010
@@ -22,27 +22,24 @@
#define _Connector_
-#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/sys/ShutdownHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/Time.h"
-#include <queue>
-#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <string>
+
namespace qpid {
namespace sys {
+class ShutdownHandler;
class SecurityLayer;
+class Poller;
+}
+
+namespace framing {
+class InputHandler;
+class AMQFrame;
}
namespace client {
@@ -52,11 +49,14 @@
///@internal
class Connector : public framing::OutputHandler
-{
+{
public:
// Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future)
- typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
- static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>,
+ framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+ static Connector* create(const std::string& proto,
+ boost::shared_ptr<qpid::sys::Poller>,
+ framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
@@ -75,7 +75,6 @@
virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
virtual unsigned int getSSF() = 0;
-
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Thu Jan 21 06:17:10 2010
@@ -26,6 +26,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/rdma/RdmaIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
@@ -48,7 +49,7 @@
using boost::format;
using boost::str;
- class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable
+ class RdmaConnector : public Connector, public sys::Codec
{
struct Buff;
@@ -60,30 +61,25 @@
Frames frames;
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
- Bounds* bounds;
-
-
+ Bounds* bounds;
+
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex pollingLock;
+ sys::Mutex pollingLock;
bool polling;
- bool joined;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- sys::Thread receiver;
-
Rdma::AsynchIO* aio;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
- void run();
void handleClosed();
bool closeInternal();
@@ -101,7 +97,7 @@
std::string identifier;
ConnectionImpl* impl;
-
+
void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
@@ -120,15 +116,16 @@
bool canEncode();
public:
- RdmaConnector(framing::ProtocolVersion pVersion,
+ RdmaConnector(Poller::shared_ptr,
+ framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
// Static constructor which registers connector here
namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new RdmaConnector(v, s, c);
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new RdmaConnector(p, v, s, c);
}
struct StaticInit {
@@ -140,7 +137,8 @@
}
-RdmaConnector::RdmaConnector(ProtocolVersion ver,
+RdmaConnector::RdmaConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
@@ -150,9 +148,9 @@
version(ver),
initiated(false),
polling(false),
- joined(true),
shutdownHandler(0),
aio(0),
+ poller(p),
impl(cimpl)
{
QPID_LOG(debug, "RdmaConnector created for " << version);
@@ -165,8 +163,6 @@
void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
- assert(joined);
- poller = Poller::shared_ptr(new Poller);
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
Rdma::Connector* c = new Rdma::Connector(
@@ -179,8 +175,6 @@
c->start(poller);
polling = true;
- joined = false;
- receiver = Thread(this);
}
// The following only gets run when connected
@@ -215,24 +209,12 @@
}
bool RdmaConnector::closeInternal() {
- bool ret;
- {
Mutex::ScopedLock l(pollingLock);
- ret = polling;
- if (polling) {
- polling = false;
- poller->shutdown();
- }
- if (joined || receiver.id() == Thread::current().id()) {
- return ret;
- }
- joined = true;
- }
-
- receiver.join();
+ bool ret = polling;
+ polling = false;
return ret;
}
-
+
void RdmaConnector::close() {
closeInternal();
}
@@ -356,28 +338,6 @@
handleClosed();
}
-void RdmaConnector::run(){
- // Keep the connection impl in memory until run() completes.
- //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
- //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
- //assert(protect);
- try {
- Dispatcher d(poller);
-
- //aio->start(poller);
- d.run();
- //aio->queueForDeletion();
- } catch (const std::exception& e) {
- {
- // We're no longer polling
- Mutex::ScopedLock l(pollingLock);
- polling = false;
- }
- QPID_LOG(error, e.what());
- handleClosed();
- }
-}
-
void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Thu Jan 21 06:17:10 2010
@@ -28,6 +28,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
@@ -50,7 +51,7 @@
using boost::str;
-class SslConnector : public Connector, private sys::Runnable
+class SslConnector : public Connector
{
struct Buff;
@@ -68,27 +69,26 @@
framing::Buffer encode;
size_t framesEncoded;
std::string identifier;
- Bounds* bounds;
-
+ Bounds* bounds;
+
void writeOne();
void newBuffer();
public:
-
+
Writer(uint16_t maxFrameSize, Bounds*);
~Writer();
void init(std::string id, sys::ssl::SslIO*);
void handle(framing::AMQFrame&);
void write(sys::ssl::SslIO&);
};
-
+
const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex closedLock;
+ sys::Mutex closedLock;
bool closed;
- bool joined;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
@@ -96,20 +96,17 @@
framing::OutputHandler* output;
Writer writer;
-
- sys::Thread receiver;
sys::ssl::SslSocket socket;
sys::ssl::SslIO* aio;
- boost::shared_ptr<sys::Poller> poller;
+ Poller::shared_ptr poller;
~SslConnector();
- void run();
void handleClosed();
bool closeInternal();
-
+
void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
void writebuff(qpid::sys::ssl::SslIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
@@ -117,8 +114,6 @@
std::string identifier;
- ConnectionImpl* impl;
-
void connect(const std::string& host, int port);
void init();
void close();
@@ -133,15 +128,20 @@
unsigned int getSSF() { return socket.getKeyLen(); }
public:
- SslConnector(framing::ProtocolVersion pVersion,
+ SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
+struct SslConnector::Buff : public SslIO::BufferBase {
+ Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
// Static constructor which registers connector here
namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new SslConnector(v, s, c);
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new SslConnector(p, v, s, c);
}
struct StaticInit {
@@ -150,9 +150,9 @@
SslOptions options;
options.parse (0, 0, QPIDC_CONF_FILE, true);
if (options.certDbPath.empty()) {
- QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
+ QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
} else {
- initNSS(options);
+ initNSS(options);
Connector::registerFactory("ssl", &create);
}
} catch (const std::exception& e) {
@@ -164,18 +164,18 @@
} init;
}
-SslConnector::SslConnector(ProtocolVersion ver,
+SslConnector::SslConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
version(ver),
initiated(false),
closed(true),
- joined(true),
shutdownHandler(0),
writer(maxFrameSize, cimpl),
aio(0),
- impl(cimpl)
+ poller(p)
{
QPID_LOG(debug, "SslConnector created for " << version.toString());
//TODO: how do we want to handle socket configuration with ssl?
@@ -198,7 +198,6 @@
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
- poller = Poller::shared_ptr(new Poller);
aio = new SslIO(socket,
boost::bind(&SslConnector::readbuff, this, _1, _2),
boost::bind(&SslConnector::eof, this, _1),
@@ -211,11 +210,12 @@
void SslConnector::init(){
Mutex::ScopedLock l(closedLock);
- assert(joined);
ProtocolInitiation init(version);
writeDataBlock(init);
- joined = false;
- receiver = Thread(this);
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+ aio->start(poller);
}
bool SslConnector::closeInternal() {
@@ -224,16 +224,11 @@
if (!closed) {
closed = true;
aio->queueForDeletion();
- poller->shutdown();
- }
- if (!joined && receiver.id() != Thread::current().id()) {
- joined = true;
- Mutex::ScopedUnlock u(closedLock);
- receiver.join();
+ socket.close();
}
return ret;
}
-
+
void SslConnector::close() {
closeInternal();
}
@@ -267,11 +262,6 @@
shutdownHandler->shutdown();
}
-struct SslConnector::Buff : public SslIO::BufferBase {
- Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -376,25 +366,4 @@
handleClosed();
}
-void SslConnector::run(){
- // Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
- assert(protect);
- try {
- Dispatcher d(poller);
-
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- aio->start(poller);
- d.run();
- socket.close();
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- handleClosed();
- }
-}
-
-
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Thu Jan 21 06:17:10 2010
@@ -27,6 +27,7 @@
#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
@@ -45,10 +46,15 @@
using boost::format;
using boost::str;
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
// Static constructor which registers connector here
namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(v, s, c);
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(p, v, s, c);
}
struct StaticInit {
@@ -58,25 +64,20 @@
} init;
}
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
-TCPConnector::TCPConnector(ProtocolVersion ver,
- const ConnectionSettings& settings,
- ConnectionImpl* cimpl)
+TCPConnector::TCPConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
lastEof(0),
currentSize(0),
bounds(cimpl),
- version(ver),
+ version(ver),
initiated(false),
closed(true),
- joined(true),
shutdownHandler(0),
aio(0),
- impl(cimpl->shared_from_this())
+ poller(p)
{
QPID_LOG(debug, "TCPConnector created for " << version.toString());
settings.configureSocket(socket);
@@ -89,16 +90,13 @@
void TCPConnector::connect(const std::string& host, int port) {
Mutex::ScopedLock l(lock);
assert(closed);
- assert(joined);
- poller = Poller::shared_ptr(new Poller);
- AsynchConnector* c =
- AsynchConnector::create(socket,
- host, port,
- boost::bind(&TCPConnector::connected, this, _1),
- boost::bind(&TCPConnector::connectFailed, this, _3));
+ AsynchConnector* c = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&TCPConnector::connected, this, _1),
+ boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
- joined = false;
- receiver = Thread(this);
+
c->start(poller);
}
@@ -113,38 +111,31 @@
for (int i = 0; i < 32; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
- aio->start(poller);
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
ProtocolInitiation init(version);
writeDataBlock(init);
+
+ aio->start(poller);
}
void TCPConnector::connectFailed(const std::string& msg) {
QPID_LOG(warning, "Connecting failed: " << msg);
- closed = true;
- poller->shutdown();
- closeInternal();
- if (shutdownHandler)
+ socket.close();
+ if (!closed && shutdownHandler) {
+ closed = true;
shutdownHandler->shutdown();
+ }
}
bool TCPConnector::closeInternal() {
- bool ret;
- {
Mutex::ScopedLock l(lock);
- ret = !closed;
+ bool ret = !closed;
if (!closed) {
closed = true;
aio->queueForDeletion();
- poller->shutdown();
- }
- if (joined || receiver.id() == Thread::current().id()) {
- return ret;
- }
- joined = true;
+ socket.close();
}
- receiver.join();
return ret;
}
@@ -301,28 +292,10 @@
handleClosed();
}
-void TCPConnector::run() {
- // Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl.lock();
- assert(protect);
- try {
- Dispatcher d(poller);
-
- d.run();
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
- handleClosed();
- }
- try {
- socket.close();
- } catch (const std::exception&) {}
-}
-
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
securityLayer->init(this);
}
-
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h Thu Jan 21 06:17:10 2010
@@ -40,9 +40,14 @@
#include <string>
namespace qpid {
+
+namespace framing {
+ class InitiationHandler;
+}
+
namespace client {
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
+class TCPConnector : public Connector, public sys::Codec
{
typedef std::deque<framing::AMQFrame> Frames;
struct Buff;
@@ -58,15 +63,12 @@
framing::ProtocolVersion version;
bool initiated;
bool closed;
- bool joined;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- sys::Thread receiver;
-
sys::Socket socket;
sys::AsynchIO* aio;
@@ -76,19 +78,16 @@
~TCPConnector();
- void run();
void handleClosed();
bool closeInternal();
- virtual void connected(const qpid::sys::Socket&);
+ void connected(const sys::Socket&);
void connectFailed(const std::string& msg);
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
- boost::weak_ptr<ConnectionImpl> impl;
-
void connect(const std::string& host, int port);
void close();
void send(framing::AMQFrame& frame);
@@ -107,9 +106,10 @@
bool canEncode();
public:
- TCPConnector(framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
- ConnectionImpl*);
+ TCPConnector(boost::shared_ptr<sys::Poller>,
+ framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
};
}} // namespace qpid::client
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org