You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/05/09 04:00:05 UTC

svn commit: r654666 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/sys: AsynchIO.h Dispatcher.cpp Socket.h TCPIOPlugin.cpp posix/AsynchIO.cpp posix/Socket.cpp

Author: astitcher
Date: Thu May  8 19:00:04 2008
New Revision: 654666

URL: http://svn.apache.org/viewvc?rev=654666&view=rev
Log:
QPID-1040: Patch from Ted Ross: Asynchronous Connector
Code to allow non-blocking connection of new sockets

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Thu May  8 19:00:04 2008
@@ -52,7 +52,34 @@
 };
 
 /*
- * Asycnchronous reader/writer: 
+ * Asynchronous connector: starts the process of initiating a connection and
+ * invokes a callback when completed or failed.
+ */
+class AsynchConnector : private DispatchHandle {
+public:
+    typedef boost::function1<void, const Socket&> ConnectedCallback;
+    typedef boost::function2<void, int, std::string> FailedCallback;
+
+private:
+    ConnectedCallback connCallback;
+    FailedCallback failCallback;
+    const Socket& socket;
+
+public:
+    AsynchConnector(const Socket& socket,
+                    Poller::shared_ptr poller,
+                    std::string hostname,
+                    uint16_t port,
+                    ConnectedCallback connCb,
+                    FailedCallback failCb = 0);
+
+private:
+    void connComplete(DispatchHandle& handle);
+    void failure(int, std::string);
+};
+
+/*
+ * Asychronous reader/writer: 
  * Reader accepts buffers to read into; reads into the provided buffers
  * and then does a callback with the buffer and amount read. Optionally it can callback
  * when there is something to read but no buffer to read it into.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Thu May  8 19:00:04 2008
@@ -322,8 +322,8 @@
 // is to ensure that the lock is released before
 // we do the delete
 void DispatchHandle::doDelete() {
-	// Ensure that we're no longer watching anything
-	stopWatch();
+    // Ensure that we're no longer watching anything
+    stopWatch();
 
     // If we're in the middle of a callback defer the delete
     {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Thu May  8 19:00:04 2008
@@ -93,7 +93,12 @@
     uint16_t getLocalPort() const;
     uint16_t getRemotePort() const;
 
-    
+    /**
+     * Returns the error code stored in the socket.  This may be used
+     * to determine the result of a non-blocking connect.
+     */
+    int getError() const;
+
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
      */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Thu May  8 19:00:04 2008
@@ -101,16 +101,20 @@
                            boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
     acceptor->start(poller);
 }
-    
+
 void AsynchIOProtocolFactory::connect(
     Poller::shared_ptr poller,
     const std::string& host, int16_t port,
     ConnectionCodec::Factory* f)
 {
-    Socket* socket = new Socket();//Should be deleted by handle when socket closes
-    socket->connect(host, port);
-
-    established(poller, *socket, f, true);
+    // Note that the following logic does not cause a memory leak.
+    // The allocated Socket is freed either by the AsynchConnector
+    // upon connection failure or by the AsynchIO upon connection
+    // shutdown.  The allocated AsynchConnector frees itself when it
+    // is no longer needed.
+    Socket* socket = new Socket();
+    new AsynchConnector(*socket, poller, host, port,
+                        boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true));
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Thu May  8 19:00:04 2008
@@ -97,6 +97,57 @@
 }
 
 /*
+ * Asynch Connector
+ */
+
+AsynchConnector::AsynchConnector(const Socket& s,
+                                 Poller::shared_ptr poller,
+                                 std::string hostname,
+                                 uint16_t port,
+                                 ConnectedCallback connCb,
+                                 FailedCallback failCb) :
+    DispatchHandle(s,
+                   0,
+                   boost::bind(&AsynchConnector::connComplete, this, _1),
+                   boost::bind(&AsynchConnector::connComplete, this, _1)),
+    connCallback(connCb),
+    failCallback(failCb),
+    socket(s)
+{
+    socket.setNonblocking();
+    try {
+        socket.connect(hostname, port);
+        startWatch(poller);
+    } catch(std::exception& e) {
+        failure(-1, std::string(e.what()));
+    }
+}
+
+void AsynchConnector::connComplete(DispatchHandle& h)
+{
+    int errCode = socket.getError();
+
+    h.stopWatch();
+    if (errCode == 0) {
+        connCallback(socket);
+	DispatchHandle::doDelete();
+    } else {
+        failure(errCode, std::string(strerror(errCode)));
+    }
+}
+
+void AsynchConnector::failure(int errCode, std::string message)
+{
+    if (failCallback)
+        failCallback(errCode, message);
+
+    socket.close();
+    delete &socket;
+
+    DispatchHandle::doDelete();
+}
+
+/*
  * Asynch reader/writer
  */
 AsynchIO::AsynchIO(const Socket& s,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=654666&r1=654665&r2=654666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Thu May  8 19:00:04 2008
@@ -148,7 +148,8 @@
     if (hp == 0)
         throw Exception(QPID_MSG("Cannot resolve " << host << ": " << h_errstr(h_errno)));
     ::memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
-    if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+    if ((::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) &&
+        (errno != EINPROGRESS))
         throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port));
 }
 
@@ -257,6 +258,17 @@
     return atoi(getService(impl->fd, true).c_str());
 }
 
+int Socket::getError() const
+{
+    int       result;
+    socklen_t rSize = sizeof (result);
+
+    if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
+        throw QPID_POSIX_ERROR(errno);
+
+    return result;
+}
+
 void Socket::configure(const Configuration& c)
 {
     c.configurePosixTcpSocket(impl->fd);