You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/01/21 07:17:29 UTC

svn commit: r901550 - in /qpid/trunk/qpid/cpp/src/qpid/client: ConnectionImpl.cpp Connector.cpp Connector.h RdmaConnector.cpp SslConnector.cpp TCPConnector.cpp TCPConnector.h

Author: astitcher
Date: Thu Jan 21 06:17:10 2010
New Revision: 901550

URL: http://svn.apache.org/viewvc?rev=901550&view=rev
Log:
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
  is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
  variable or the config file
- The client threads are initialised (via a singleton) when first
  used in a Connection::open()

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Jan 21 06:17:10 2010
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
+
 #include "qpid/client/ConnectionImpl.h"
+
 #include "qpid/client/Connector.h"
 #include "qpid/client/ConnectionSettings.h"
 #include "qpid/client/SessionImpl.h"
@@ -27,11 +29,20 @@
 #include "qpid/Url.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/Options.h"
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <limits>
+#include <vector>
+
+#ifdef HAVE_CONFIG_H
+#  include "config.h"
+#endif
 
 namespace qpid {
 namespace client {
@@ -41,7 +52,10 @@
 using namespace qpid::sys;
 using namespace qpid::framing::connection;//for connection error codes
 
-// Get timer singleton  
+namespace {
+// Maybe should amalgamate the singletons into a single client singleton
+
+// Get timer singleton
 Timer& theTimer() {
     static Mutex timerInitLock;
     ScopedLock<Mutex> l(timerInitLock);
@@ -50,6 +64,76 @@
     return t;
 }
 
+struct IOThreadOptions : public qpid::Options {
+    int maxIOThreads;
+
+    IOThreadOptions(int c) :
+        Options("IO threading options"),
+        maxIOThreads(c)
+    {
+        addOptions()
+            ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use");
+    }
+};
+
+// IO threads
+class IOThread {
+    int maxIOThreads;
+    int ioThreads;
+    int connections;
+    Mutex threadLock;
+    std::vector<Thread> t;
+    Poller::shared_ptr poller_;
+
+public:
+    void add() {
+        ScopedLock<Mutex> l(threadLock);
+        ++connections;
+        if (ioThreads < maxIOThreads) {
+            QPID_LOG(debug, "Created IO thread: " << ioThreads);
+            ++ioThreads;
+            t.push_back( Thread(poller_.get()) );
+        }
+    }
+
+    void sub() {
+        ScopedLock<Mutex> l(threadLock);
+        --connections;
+    }
+
+    Poller::shared_ptr poller() const {
+        return poller_;
+    }
+
+    // Here is where the maximum number of threads is set
+    IOThread(int c) :
+        ioThreads(0),
+        connections(0),
+        poller_(new Poller)
+    {
+        IOThreadOptions options(c);
+        options.parse(0, 0, QPIDC_CONF_FILE, true);
+        maxIOThreads = (options.maxIOThreads != -1) ?
+            options.maxIOThreads : 1;
+    }
+
+    // We can't destroy threads one-by-one as the only
+    // control we have is to shutdown the whole lot
+    // and we can't do that before we're unloaded as we can't
+    // restart the Poller after shutting it down
+    ~IOThread() {
+        poller_->shutdown();
+        for (int i=0; i<ioThreads; ++i) {
+            t[i].join();
+        }
+    }
+};
+
+IOThread& theIO() {
+    static IOThread io(SystemInfo::concurrency());
+    return io;
+}
+
 class HeartbeatTask : public TimerTask {
     TimeoutHandler& timeout;
 
@@ -66,6 +150,8 @@
     {}
 };
 
+}
+
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
@@ -89,6 +175,7 @@
     // connector thread does not call on us while the destructor
     // is running.
     if (connector) connector->close();
+    theIO().sub();
 }
 
 void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
@@ -131,11 +218,10 @@
 }
 
 bool ConnectionImpl::isOpen() const 
-{ 
+{
     return handler.isOpen();
 }
 
-
 void ConnectionImpl::open()
 {
     const std::string& protocol = handler.protocol;
@@ -143,7 +229,8 @@
     int port = handler.port;
     QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port);
 
-    connector.reset(Connector::create(protocol, version, handler, this)); 
+    theIO().add();
+    connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
     connector->setInputHandler(&handler);
     connector->setShutdownHandler(this);
     connector->connect(host, port);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 21 06:17:10 2010
@@ -21,32 +21,17 @@
 
 #include "qpid/client/Connector.h"
 
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/Codec.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/SecurityLayer.h"
-#include "qpid/Msg.h"
 
-#include <iostream>
 #include <map>
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
 
 namespace qpid {
 namespace client {
 
 using namespace qpid::sys;
 using namespace qpid::framing;
-using boost::format;
-using boost::str;
 
-// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
 namespace {
     typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
 
@@ -57,13 +42,15 @@
     } 
 }
 
-Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+Connector* Connector::create(const std::string& proto,
+                             boost::shared_ptr<Poller> p,
+                             framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
 {
     ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
     if (i==theProtocolRegistry().end()) {
         throw Exception(QPID_MSG("Unknown protocol: " << proto));
     }
-    return (i->second)(v, s, c);
+    return (i->second)(p, v, s, c);
 }
 
 void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
@@ -79,4 +66,5 @@
 {
 }
 
+
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 21 06:17:10 2010
@@ -22,27 +22,24 @@
 #define _Connector_
 
 
-#include "qpid/framing/InputHandler.h"
 #include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/ProtocolVersion.h"
-#include "qpid/sys/ShutdownHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/Time.h"
 
-#include <queue>
-#include <boost/weak_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
+#include <string>
+
 namespace qpid {
 
 namespace sys {
+class ShutdownHandler;
 class SecurityLayer;
+class Poller;
+}
+
+namespace framing {
+class InputHandler;
+class AMQFrame;
 }
 
 namespace client {
@@ -52,11 +49,14 @@
 
 ///@internal
 class Connector : public framing::OutputHandler
-{    
+{
   public:
     // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future)
-    typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
-    static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+    typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>,
+                               framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
+    static Connector* create(const std::string& proto,
+                             boost::shared_ptr<qpid::sys::Poller>,
+                             framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*);
     static void registerFactory(const std::string& proto, Factory* connectorFactory);
 
     virtual ~Connector() {};
@@ -75,7 +75,6 @@
     virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
 
     virtual unsigned int getSSF() = 0;
-
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Thu Jan 21 06:17:10 2010
@@ -26,6 +26,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
 #include "qpid/sys/rdma/RdmaIO.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
@@ -48,7 +49,7 @@
 using boost::format;
 using boost::str;
 
-  class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable
+  class RdmaConnector : public Connector, public sys::Codec
 {
     struct Buff;
 
@@ -60,30 +61,25 @@
     Frames frames;
     size_t lastEof; // Position after last EOF in frames
     uint64_t currentSize;
-    Bounds* bounds;        
-    
-    
+    Bounds* bounds;
+
     framing::ProtocolVersion version;
     bool initiated;
 
-    sys::Mutex pollingLock;    
+    sys::Mutex pollingLock;
     bool polling;
-    bool joined;
 
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
 
-    sys::Thread receiver;
-
     Rdma::AsynchIO* aio;
     sys::Poller::shared_ptr poller;
     std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
 
     ~RdmaConnector();
 
-    void run();
     void handleClosed();
     bool closeInternal();
 
@@ -101,7 +97,7 @@
     std::string identifier;
 
     ConnectionImpl* impl;
-    
+
     void connect(const std::string& host, int port);
     void close();
     void send(framing::AMQFrame& frame);
@@ -120,15 +116,16 @@
     bool canEncode();
 
 public:
-    RdmaConnector(framing::ProtocolVersion pVersion,
+    RdmaConnector(Poller::shared_ptr,
+              framing::ProtocolVersion pVersion,
               const ConnectionSettings&, 
               ConnectionImpl*);
 };
 
 // Static constructor which registers connector here
 namespace {
-    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
-        return new RdmaConnector(v, s, c);
+    Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+        return new RdmaConnector(p, v, s, c);
     }
 
     struct StaticInit {
@@ -140,7 +137,8 @@
 }
 
 
-RdmaConnector::RdmaConnector(ProtocolVersion ver,
+RdmaConnector::RdmaConnector(Poller::shared_ptr p,
+                     ProtocolVersion ver,
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
@@ -150,9 +148,9 @@
       version(ver), 
       initiated(false),
       polling(false),
-      joined(true),
       shutdownHandler(0),
       aio(0),
+      poller(p),
       impl(cimpl)
 {
     QPID_LOG(debug, "RdmaConnector created for " << version);
@@ -165,8 +163,6 @@
 void RdmaConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(pollingLock);
     assert(!polling);
-    assert(joined);
-    poller = Poller::shared_ptr(new Poller);
 
     SocketAddress sa(host, boost::lexical_cast<std::string>(port));
     Rdma::Connector* c = new Rdma::Connector(
@@ -179,8 +175,6 @@
     c->start(poller);
 
     polling = true;
-    joined = false;
-    receiver = Thread(this);
 }
 
 // The following only gets run when connected
@@ -215,24 +209,12 @@
 }
 
 bool RdmaConnector::closeInternal() {
-    bool ret;
-    {
     Mutex::ScopedLock l(pollingLock);
-    ret = polling;
-    if (polling) {
-        polling = false;
-        poller->shutdown();
-    }
-    if (joined || receiver.id() == Thread::current().id()) {
-        return ret;
-    }
-    joined = true;
-    }
-
-    receiver.join();
+    bool ret = polling;
+    polling = false;
     return ret;
 }
-        
+
 void RdmaConnector::close() {
     closeInternal();
 }
@@ -356,28 +338,6 @@
     handleClosed();
 }
 
-void RdmaConnector::run(){
-    // Keep the connection impl in memory until run() completes.
-    //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
-    //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
-    //assert(protect);
-    try {
-        Dispatcher d(poller);
-	
-        //aio->start(poller);
-        d.run();
-        //aio->queueForDeletion();
-    } catch (const std::exception& e) {
-        {
-        // We're no longer polling
-        Mutex::ScopedLock l(pollingLock);
-        polling = false;
-        }
-        QPID_LOG(error, e.what());
-        handleClosed();
-    }
-}
-
 void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
 {
     securityLayer = sl;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Thu Jan 21 06:17:10 2010
@@ -28,6 +28,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
 #include "qpid/sys/ssl/util.h"
 #include "qpid/sys/ssl/SslIo.h"
 #include "qpid/sys/ssl/SslSocket.h"
@@ -50,7 +51,7 @@
 using boost::str;
 
 
-class SslConnector : public Connector, private sys::Runnable
+class SslConnector : public Connector
 {
     struct Buff;
 
@@ -68,27 +69,26 @@
         framing::Buffer encode;
         size_t framesEncoded;
         std::string identifier;
-        Bounds* bounds;        
-        
+        Bounds* bounds;
+
         void writeOne();
         void newBuffer();
 
       public:
-        
+
         Writer(uint16_t maxFrameSize, Bounds*);
         ~Writer();
         void init(std::string id, sys::ssl::SslIO*);
         void handle(framing::AMQFrame&);
         void write(sys::ssl::SslIO&);
     };
-    
+
     const uint16_t maxFrameSize;
     framing::ProtocolVersion version;
     bool initiated;
 
-    sys::Mutex closedLock;    
+    sys::Mutex closedLock;
     bool closed;
-    bool joined;
 
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
@@ -96,20 +96,17 @@
     framing::OutputHandler* output;
 
     Writer writer;
-    
-    sys::Thread receiver;
 
     sys::ssl::SslSocket socket;
 
     sys::ssl::SslIO* aio;
-    boost::shared_ptr<sys::Poller> poller;
+    Poller::shared_ptr poller;
 
     ~SslConnector();
 
-    void run();
     void handleClosed();
     bool closeInternal();
-    
+
     void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
     void writebuff(qpid::sys::ssl::SslIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
@@ -117,8 +114,6 @@
 
     std::string identifier;
 
-    ConnectionImpl* impl;
-    
     void connect(const std::string& host, int port);
     void init();
     void close();
@@ -133,15 +128,20 @@
     unsigned int getSSF() { return socket.getKeyLen(); }
 
 public:
-    SslConnector(framing::ProtocolVersion pVersion,
+    SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
               const ConnectionSettings&, 
               ConnectionImpl*);
 };
 
+struct SslConnector::Buff : public SslIO::BufferBase {
+    Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
+    ~Buff() { delete [] bytes;}
+};
+
 // Static constructor which registers connector here
 namespace {
-    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
-        return new SslConnector(v, s, c);
+    Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+        return new SslConnector(p, v, s, c);
     }
 
     struct StaticInit {
@@ -150,9 +150,9 @@
                 SslOptions options;
                 options.parse (0, 0, QPIDC_CONF_FILE, true);
                 if (options.certDbPath.empty()) {
-                    QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");                    
+                    QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it.");
                 } else {
-                    initNSS(options);                
+                    initNSS(options);
                     Connector::registerFactory("ssl", &create);
                 }
             } catch (const std::exception& e) {
@@ -164,18 +164,18 @@
     } init;
 }
 
-SslConnector::SslConnector(ProtocolVersion ver,
+SslConnector::SslConnector(Poller::shared_ptr p,
+                     ProtocolVersion ver,
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
       version(ver), 
       initiated(false),
       closed(true),
-      joined(true),
       shutdownHandler(0),
       writer(maxFrameSize, cimpl),
       aio(0),
-      impl(cimpl)
+      poller(p)
 {
     QPID_LOG(debug, "SslConnector created for " << version.toString());
     //TODO: how do we want to handle socket configuration with ssl?
@@ -198,7 +198,6 @@
 
     identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     closed = false;
-    poller = Poller::shared_ptr(new Poller);
     aio = new SslIO(socket,
                        boost::bind(&SslConnector::readbuff, this, _1, _2),
                        boost::bind(&SslConnector::eof, this, _1),
@@ -211,11 +210,12 @@
 
 void SslConnector::init(){
     Mutex::ScopedLock l(closedLock);
-    assert(joined);
     ProtocolInitiation init(version);
     writeDataBlock(init);
-    joined = false;
-    receiver = Thread(this);
+    for (int i = 0; i < 32; i++) {
+        aio->queueReadBuffer(new Buff(maxFrameSize));
+    }
+    aio->start(poller);
 }
 
 bool SslConnector::closeInternal() {
@@ -224,16 +224,11 @@
     if (!closed) {
         closed = true;
         aio->queueForDeletion();
-        poller->shutdown();
-    }
-    if (!joined && receiver.id() != Thread::current().id()) {
-        joined = true;
-        Mutex::ScopedUnlock u(closedLock);
-        receiver.join();
+        socket.close();
     }
     return ret;
 }
-        
+
 void SslConnector::close() {
     closeInternal();
 }
@@ -267,11 +262,6 @@
         shutdownHandler->shutdown();
 }
 
-struct SslConnector::Buff : public SslIO::BufferBase {
-    Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}    
-    ~Buff() { delete [] bytes;}
-};
-
 SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
 {
 }
@@ -376,25 +366,4 @@
     handleClosed();
 }
 
-void SslConnector::run(){
-    // Keep the connection impl in memory until run() completes.
-    boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
-    assert(protect);
-    try {
-        Dispatcher d(poller);
-	
-        for (int i = 0; i < 32; i++) {
-            aio->queueReadBuffer(new Buff(maxFrameSize));
-        }
-	
-        aio->start(poller);
-        d.run();
-        socket.close();
-    } catch (const std::exception& e) {
-        QPID_LOG(error, e.what());
-        handleClosed();
-    }
-}
-
-
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Thu Jan 21 06:17:10 2010
@@ -27,6 +27,7 @@
 #include "qpid/sys/Codec.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
@@ -45,10 +46,15 @@
 using boost::format;
 using boost::str;
 
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+    ~Buff() { delete [] bytes;}
+};
+
 // Static constructor which registers connector here
 namespace {
-    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
-        return new TCPConnector(v, s, c);
+    Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+        return new TCPConnector(p, v, s, c);
     }
 
     struct StaticInit {
@@ -58,25 +64,20 @@
     } init;
 }
 
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
-TCPConnector::TCPConnector(ProtocolVersion ver,
-                           const ConnectionSettings& settings,
-                           ConnectionImpl* cimpl)
+TCPConnector::TCPConnector(Poller::shared_ptr p,
+                     ProtocolVersion ver,
+                     const ConnectionSettings& settings,
+                     ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
       lastEof(0),
       currentSize(0),
       bounds(cimpl),
-      version(ver), 
+      version(ver),
       initiated(false),
       closed(true),
-      joined(true),
       shutdownHandler(0),
       aio(0),
-      impl(cimpl->shared_from_this())
+      poller(p)
 {
     QPID_LOG(debug, "TCPConnector created for " << version.toString());
     settings.configureSocket(socket);
@@ -89,16 +90,13 @@
 void TCPConnector::connect(const std::string& host, int port) {
     Mutex::ScopedLock l(lock);
     assert(closed);
-    assert(joined);
-    poller = Poller::shared_ptr(new Poller);
-    AsynchConnector* c =
-    AsynchConnector::create(socket,
-                            host, port,
-                            boost::bind(&TCPConnector::connected, this, _1),
-                            boost::bind(&TCPConnector::connectFailed, this, _3));
+    AsynchConnector* c = AsynchConnector::create(
+        socket,
+        host, port,
+        boost::bind(&TCPConnector::connected, this, _1),
+        boost::bind(&TCPConnector::connectFailed, this, _3));
     closed = false;
-    joined = false;
-    receiver = Thread(this);
+
     c->start(poller);
 }
 
@@ -113,38 +111,31 @@
     for (int i = 0; i < 32; i++) {
         aio->queueReadBuffer(new Buff(maxFrameSize));
     }
-    aio->start(poller);
 
     identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     ProtocolInitiation init(version);
     writeDataBlock(init);
+
+    aio->start(poller);
 }
 
 void TCPConnector::connectFailed(const std::string& msg) {
     QPID_LOG(warning, "Connecting failed: " << msg);
-    closed = true;
-    poller->shutdown();
-    closeInternal();
-    if (shutdownHandler)
+    socket.close();
+    if (!closed && shutdownHandler) {
+        closed = true;
         shutdownHandler->shutdown();
+    }
 }
 
 bool TCPConnector::closeInternal() {
-    bool ret;
-    {
     Mutex::ScopedLock l(lock);
-    ret = !closed;
+    bool ret = !closed;
     if (!closed) {
         closed = true;
         aio->queueForDeletion();
-        poller->shutdown();
-    }
-    if (joined || receiver.id() == Thread::current().id()) {
-        return ret;
-    }
-    joined = true;
+        socket.close();
     }
-    receiver.join();
     return ret;
 }
 
@@ -301,28 +292,10 @@
     handleClosed();
 }
 
-void TCPConnector::run() {
-    // Keep the connection impl in memory until run() completes.
-    boost::shared_ptr<ConnectionImpl> protect = impl.lock();
-    assert(protect);
-    try {
-        Dispatcher d(poller);
-
-        d.run();
-    } catch (const std::exception& e) {
-        QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
-        handleClosed();
-    }
-    try {
-        socket.close();
-    } catch (const std::exception&) {}
-}
-
 void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
 {
     securityLayer = sl;
     securityLayer->init(this);
 }
 
-
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h?rev=901550&r1=901549&r2=901550&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h Thu Jan 21 06:17:10 2010
@@ -40,9 +40,14 @@
 #include <string>
 
 namespace qpid {
+
+namespace framing {
+    class InitiationHandler;
+}
+
 namespace client {
 
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
+class TCPConnector : public Connector, public sys::Codec
 {
     typedef std::deque<framing::AMQFrame> Frames;
     struct Buff;
@@ -58,15 +63,12 @@
     framing::ProtocolVersion version;
     bool initiated;
     bool closed;
-    bool joined;
 
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
 
-    sys::Thread receiver;
-
     sys::Socket socket;
 
     sys::AsynchIO* aio;
@@ -76,19 +78,16 @@
 
     ~TCPConnector();
 
-    void run();
     void handleClosed();
     bool closeInternal();
 
-    virtual void connected(const qpid::sys::Socket&);
+    void connected(const sys::Socket&);
     void connectFailed(const std::string& msg);
     bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
     void writebuff(qpid::sys::AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
     void eof(qpid::sys::AsynchIO&);
 
-    boost::weak_ptr<ConnectionImpl> impl;
-
     void connect(const std::string& host, int port);
     void close();
     void send(framing::AMQFrame& frame);
@@ -107,9 +106,10 @@
     bool canEncode();
 
 public:
-    TCPConnector(framing::ProtocolVersion pVersion,
-                 const ConnectionSettings&, 
-                 ConnectionImpl*);
+    TCPConnector(boost::shared_ptr<sys::Poller>,
+              framing::ProtocolVersion pVersion,
+              const ConnectionSettings&,
+              ConnectionImpl*);
 };
 
 }}   // namespace qpid::client



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org