You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/10/09 05:37:59 UTC

svn commit: r823389 - /qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp

Author: astitcher
Date: Fri Oct  9 03:37:59 2009
New Revision: 823389

URL: http://svn.apache.org/viewvc?rev=823389&view=rev
Log:
Split TCP connect so that the connect is non blocking
    - This means it can now be interrupted by heartbeat abort

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=823389&r1=823388&r2=823389&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Oct  9 03:37:59 2009
@@ -51,10 +51,10 @@
 // Stuff for the registry of protocol connectors (maybe should be moved to its own file)
 namespace {
     typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-    
+
     ProtocolRegistry& theProtocolRegistry() {
         static ProtocolRegistry protocolRegistry;
-        
+
         return protocolRegistry;
     } 
 }
@@ -93,7 +93,7 @@
     size_t lastEof; // Position after last EOF in frames
     uint64_t currentSize;
     Bounds* bounds;
-    
+
     framing::ProtocolVersion version;
     bool initiated;
     bool closed;
@@ -118,16 +118,17 @@
     void run();
     void handleClosed();
     bool closeInternal();
-    
+
+    void connected(const Socket&);
+    void connectFailed(const std::string& msg);
     bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
     void writebuff(qpid::sys::AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
     void eof(qpid::sys::AsynchIO&);
 
     boost::weak_ptr<ConnectionImpl> impl;
-    
+
     void connect(const std::string& host, int port);
-    void init();
     void close();
     void send(framing::AMQFrame& frame);
     void abort();
@@ -142,7 +143,6 @@
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool canEncode();
-    
 
 public:
     TCPConnector(framing::ProtocolVersion pVersion,
@@ -163,6 +163,11 @@
     } init;
 }
 
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+    ~Buff() { delete [] bytes;}
+};
+
 TCPConnector::TCPConnector(ProtocolVersion ver,
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
@@ -189,15 +194,19 @@
 void TCPConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(lock);
     assert(closed);
-    try {
-        socket.connect(host, port);
-    } catch (const std::exception& /*e*/) {
-        socket.close();
-        throw;
-    }
-
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+    assert(joined);
     poller = Poller::shared_ptr(new Poller);
+    AsynchConnector::create(socket,
+                            poller,
+                            host, port,
+                            boost::bind(&TCPConnector::connected, this, _1),
+                            boost::bind(&TCPConnector::connectFailed, this, _3));
+    closed = false;
+    joined = false;
+    receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
     aio = AsynchIO::create(socket,
                        boost::bind(&TCPConnector::readbuff, this, _1, _2),
                        boost::bind(&TCPConnector::eof, this, _1),
@@ -205,16 +214,23 @@
                        0, // closed
                        0, // nobuffs
                        boost::bind(&TCPConnector::writebuff, this, _1));
-    closed = false;
-}
+    for (int i = 0; i < 32; i++) {
+        aio->queueReadBuffer(new Buff(maxFrameSize));
+    }
+    aio->start(poller);
 
-void TCPConnector::init(){
-    Mutex::ScopedLock l(lock);
-    assert(joined);
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     ProtocolInitiation init(version);
     writeDataBlock(init);
-    joined = false;
-    receiver = Thread(this);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+    QPID_LOG(warning, "Connecting failed: " << msg);
+    closed = true;
+    poller->shutdown();
+    closeInternal();
+    if (shutdownHandler)
+        shutdownHandler->shutdown();
 }
 
 bool TCPConnector::closeInternal() {
@@ -235,7 +251,7 @@
     receiver.join();
     return ret;
 }
-        
+
 void TCPConnector::close() {
     closeInternal();
 }
@@ -243,7 +259,13 @@
 void TCPConnector::abort() {
     // Can't abort a closed connection
     if (!closed) {
-        aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+        if (aio) {
+            // Established connection
+            aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+        } else {
+            // We're still connecting
+            connectFailed("Connection timedout");
+        }
     }
 }
 
@@ -288,18 +310,13 @@
         shutdownHandler->shutdown();
 }
 
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}    
-    ~Buff() { delete [] bytes;}
-};
-
 void TCPConnector::writebuff(AsynchIO& /*aio*/) 
 {
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
     if (codec->canEncode()) {
         std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
         if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-        
+
         size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
 
         buffer->dataStart = 0;
@@ -395,11 +412,6 @@
     try {
         Dispatcher d(poller);
 
-        for (int i = 0; i < 32; i++) {
-            aio->queueReadBuffer(new Buff(maxFrameSize));
-        }
-
-        aio->start(poller);
         d.run();
     } catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org