You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/04/27 02:06:20 UTC

svn commit: r938271 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf: internal/net/tcp/TcpSocket.cpp internal/net/tcp/TcpSocket.h net/Socket.cpp

Author: tabish
Date: Tue Apr 27 00:06:20 2010
New Revision: 938271

URL: http://svn.apache.org/viewvc?rev=938271&view=rev
Log:
Removes some of the debug code and fixes Socket Connect and ServerSocket Accept so that they behave when a Connect timeout or SoTimeout is set, prior to this small timeout values could result in exceptions that weren't really errors.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp Tue Apr 27 00:06:20 2010
@@ -61,11 +61,16 @@ using namespace decaf::lang::exceptions;
 ////////////////////////////////////////////////////////////////////////////////
 TcpSocket::TcpSocket() throw ( SocketException )
   : socketHandle( NULL ),
+    localAddress( NULL ),
+    remoteAddress( NULL ),
+    pollSet( NULL ),
     inputStream( NULL ),
     outputStream( NULL ),
     inputShutdown( false ),
     outputShutdown( false ),
-    closed( false ) {
+    closed( false ),
+    trafficClass( 0 ),
+    soTimeout( -1 ) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -102,13 +107,12 @@ void TcpSocket::create() throw( decaf::i
                 __FILE__, __LINE__, "The System level socket has already been created." );
         }
 
-        std::cout << "TcpSocket::create - Creating new Socket instance." << std::endl;
-
         // Create the actual socket.
         checkResult( apr_socket_create( &socketHandle, AF_INET, SOCK_STREAM,
                                         APR_PROTO_TCP, apr_pool.getAprPool() ) );
 
-        std::cout << "TcpSocket::create - Created new Socket instance." << std::endl;
+        // Create the pollset for the socket.
+        checkResult( apr_pollset_create( &pollSet, 1, apr_pool.getAprPool(), APR_POLLSET_NOCOPY ) );
     }
     DECAF_CATCH_RETHROW( decaf::io::IOException )
     DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -133,25 +137,60 @@ void TcpSocket::accept( SocketImpl* sock
 
         apr_status_t result = APR_SUCCESS;
 
-        std::cout << "TcpSocket::accept - Accepting new Socket instance." << std::endl;
+        if( this->soTimeout == -1 ) {
+
+            // ensure we are in a blocking accept mode.
+            apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
+            apr_socket_timeout_set( socketHandle, -1 );
+
+        } else {
+
+            // ensure we are in a non-blocking accept mode.
+            apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 1 );
+            apr_socket_timeout_set( socketHandle, 0 );
+
+            int num = 0;
+            const apr_pollfd_t* signalled = NULL;
+            apr_pollfd_t pfd = { apr_pool.getAprPool(),
+                                 APR_POLL_SOCKET,
+                                 APR_POLLIN | APR_POLLERR,
+                                 0, { NULL }, NULL };
+
+            pfd.desc.s = socketHandle;
+
+            // Add the socket to pollset to check APR_POLLOUT(writable)
+            apr_pollset_add( pollSet, &pfd );
+
+            // Poll for the specified timeout, the value in APR is taken as Microseconds.
+            result = apr_pollset_poll( pollSet, soTimeout * 1000, &num, &signalled );
+
+            // Check for async close event.
+            if( closed ) {
+                return;
+            }
+
+            // Remove the socket from the pollset now.
+            apr_pollset_remove( pollSet, &pfd );
+
+            if( result == APR_TIMEUP ) {
+                close();
+                throw SocketTimeoutException(
+                     __FILE__, __LINE__,
+                     "Timed out while waiting for Socket to Connect." );
+            }
+        }
 
         // Loop to ignore any signal interruptions that occur during the operation.
         do {
             result = apr_socket_accept( &impl->socketHandle, socketHandle, apr_pool.getAprPool() );
         } while( result == APR_EINTR );
 
-        if( result == APR_EAGAIN ) {
-            std::cout << "Server Socket Accept indicates it would block." << std::endl;
-        }
-
         if( result != APR_SUCCESS ) {
             throw SocketException(
                   __FILE__, __LINE__,
                   "ServerSocket::accept - %s",
                   SocketError::getErrorString().c_str() );
         }
-
-        std::cout << "TcpSocket::accept - Accepted new Socket instance." << std::endl;
     }
     DECAF_CATCH_RETHROW( decaf::io::IOException )
     DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -176,10 +215,6 @@ void TcpSocket::bind( const std::string&
 
         const char* host = ipaddress.empty() ? NULL : ipaddress.c_str();
 
-        std::cout << "Attempting to Bind Socket to IPAddress: "
-                  << ( ipaddress.empty() ? "NULL" : ipaddress )
-                  << ", on port: " << port << std::endl;
-
         // Create the Address Info for the Socket
         apr_status_t result = apr_sockaddr_info_get(
             &localAddress, host, APR_INET, (apr_port_t)port, 0, apr_pool.getAprPool() );
@@ -215,11 +250,6 @@ void TcpSocket::bind( const std::string&
             checkResult( apr_socket_addr_get( &localAddress, APR_LOCAL, socketHandle ) );
             this->localPort = localAddress->port;
         }
-
-        std::cout << "Successfully bound Socket to IPAddress: "
-                  << this->getLocalAddress()
-                  << ", on port: "
-                  << this->getLocalPort() << std::endl;
     }
     DECAF_CATCH_RETHROW( decaf::io::IOException )
     DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -229,6 +259,7 @@ void TcpSocket::bind( const std::string&
 ////////////////////////////////////////////////////////////////////////////////
 void TcpSocket::connect( const std::string& hostname, int port, int timeout )
     throw( decaf::io::IOException,
+           decaf::net::SocketTimeoutException,
            decaf::lang::exceptions::IllegalArgumentException ) {
 
     try{
@@ -243,49 +274,83 @@ void TcpSocket::connect( const std::stri
                 __FILE__, __LINE__, "The socket was not yet created." );
         }
 
-        std::cout << "TcpSocket::connect - Attempting to aqire address info for IPAddress: "
-                  << ( hostname.empty() ? "NULL" : hostname )
-                  << ", on port: " << port << std::endl;
-
         // Create the Address data
         checkResult( apr_sockaddr_info_get(
             &remoteAddress, hostname.c_str(), APR_INET, (apr_port_t)port, 0, apr_pool.getAprPool() ) );
 
-        std::cout << "TcpSocket::connect - Attempting to Connect Socket to IPAddress: "
-                  << ( hostname.empty() ? "NULL" : hostname )
-                  << ", on port: " << port << std::endl;
-
-        // To make blocking-with-timeout sockets, we have to set it to
-        // 'APR_SO_NONBLOCK==1(on) and timeout>0'. On Unix, we have no
-        // problem to specify 'APR_SO_NONBLOCK==0(off) and timeout>0'.
-        // Unfortunately, we have a problem on Windows. Setting the
-        // mode to 'APR_SO_NONBLOCK==0(off) and timeout>0' causes
-        // blocking-with-system-timeout sockets on Windows.
-        //
-        // http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html
-
-        // If we have a connection timeout specified, temporarily set the socket to
-        // non-blocking so that we can timeout the connect operation.  We'll restore
-        // to blocking mode right after we connect.
-        apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, (timeout > 0 ) ? 1 : 0 );
-        apr_socket_timeout_set( socketHandle, timeout );
+        int oldNonblockSetting = 0;
+        apr_interval_time_t oldTimeoutSetting = 0;
 
-        // try to Connect to the provided address.
-        checkResult(apr_socket_connect( socketHandle, remoteAddress ));
+        // Record the old settings.
+        apr_socket_opt_get( socketHandle, APR_SO_NONBLOCK, &oldNonblockSetting );
+        apr_socket_timeout_get( socketHandle, &oldTimeoutSetting );
+
+        // Timeout and non-timeout case require very different logic.
+        if( timeout <= 0 ) {
+
+            // Temporarily make it what we want, blocking with no timeout.
+            apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
+            apr_socket_timeout_set( socketHandle, -1 );
 
-        // Now that we are connected, we want to go back to blocking.
-        apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
-        apr_socket_timeout_set( socketHandle, -1 );
+            // try to Connect to the provided address.
+            checkResult( apr_socket_connect( socketHandle, remoteAddress ) );
+
+        } else {
+
+            // Temporarily make it what we want, blocking with no timeout.
+            apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 1 );
+            apr_socket_timeout_set( socketHandle, 0 );
+
+            apr_status_t result = apr_socket_connect( socketHandle, remoteAddress );
+
+            // Special case, it connected, usually doesn't happen.
+            if( result == APR_SUCCESS ) {
+                return;
+            } else if( APR_STATUS_IS_EINPROGRESS( result ) ) {
+
+                int num = 0;
+                const apr_pollfd_t* signalled = NULL;
+                apr_pollfd_t pfd = { apr_pool.getAprPool(), APR_POLL_SOCKET, APR_POLLOUT, 0, { NULL }, NULL };
+
+                pfd.desc.s = socketHandle;
+
+                // Add the socket to pollset to check APR_POLLOUT(writable)
+                apr_pollset_add( pollSet, &pfd );
+
+                // Poll for the specified timeout, the value in APR is taken as Microseconds.
+                result = apr_pollset_poll( pollSet, timeout * 1000, &num, &signalled );
+
+                // Remove the socket from the pollset now.
+                apr_pollset_remove( pollSet, &pfd );
+
+                if( result != APR_SUCCESS || num == 0 ) {
+                    close();
+                    throw SocketTimeoutException(
+                         __FILE__, __LINE__,
+                         "Timed out while waiting for Socket to Connect." );
+                }
+
+            } else {
+                close();
+                throw SocketException(
+                    __FILE__, __LINE__, "Error while attempting to connect to remote host.",
+                    SocketError::getErrorString().c_str() );
+            }
+        }
+
+        // Now that we are connected, we want to go back to old settings.
+        apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, oldNonblockSetting );
+        apr_socket_timeout_set( socketHandle, oldTimeoutSetting );
 
         // Create an input/output stream for this socket.
         inputStream = new TcpSocketInputStream( this );
         outputStream = new TcpSocketOutputStream( this );
 
-        std::cout << "TcpSocket::connect - Connected new Socket to IPAddress: "
-                  << ( hostname.empty() ? "NULL" : hostname )
-                  << ", on port: " << port << std::endl;
-
-    } catch( SocketException& ex ) {
+    } catch( IOException& ex ) {
+        ex.setMark( __FILE__, __LINE__);
+        try{ close(); } catch( lang::Exception& cx){ /* Absorb */ }
+        throw ex;
+    } catch( Exception& ex ) {
         ex.setMark( __FILE__, __LINE__);
         try{ close(); } catch( lang::Exception& cx){ /* Absorb */ }
         throw ex;
@@ -322,8 +387,6 @@ void TcpSocket::listen( int backlog ) th
                 __FILE__, __LINE__, "The stream is closed" );
         }
 
-        std::cout << "TcpSocket::listen - Setting up listen on Socket: backlog = " << backlog << std::endl;
-
         // Setup the listen for incoming connection requests
         apr_status_t result = apr_socket_listen( socketHandle, backlog );
 
@@ -333,8 +396,6 @@ void TcpSocket::listen( int backlog ) th
                 __FILE__, __LINE__, "Error on Bind - %s",
                 SocketError::getErrorString().c_str() );
         }
-
-        std::cout << "TcpSocket::listen - Now listening on Socket:" << std::endl;
     }
     DECAF_CATCH_RETHROW( decaf::io::IOException )
     DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -410,6 +471,10 @@ void TcpSocket::close() throw( decaf::io
 
     try{
 
+        if( this->closed ) {
+            return;
+        }
+
         this->closed = true;
 
         // Destroy the input stream.
@@ -429,6 +494,12 @@ void TcpSocket::close() throw( decaf::io
             apr_socket_close( socketHandle );
             socketHandle = NULL;
         }
+
+        // Destroy the pollset
+        if( pollSet != NULL ) {
+            apr_pollset_destroy( pollSet );
+            pollSet = NULL;
+        }
     }
     DECAF_CATCH_RETHROW( decaf::io::IOException )
     DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -515,6 +586,7 @@ void TcpSocket::setOption( int option, i
         if( option == SocketOptions::SOCKET_OPTION_TIMEOUT ) {
             // Time in APR for sockets is in microseconds so multiply by 1000.
             checkResult( apr_socket_timeout_set( socketHandle, value * 1000 ) );
+            this->soTimeout = value;
             return;
         }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h Tue Apr 27 00:06:20 2010
@@ -25,8 +25,10 @@
 #include <decaf/internal/AprPool.h>
 
 #include <apr_network_io.h>
+#include <apr_poll.h>
 
 #include <decaf/io/IOException.h>
+#include <decaf/net/SocketTimeoutException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/IndexOutOfBoundsException.h>
 
@@ -78,6 +80,11 @@ namespace tcp {
         SocketAddress remoteAddress;
 
         /**
+         * APR Pollset used for connect and accept when soTimeout is set.
+         */
+        apr_pollset_t* pollSet;
+
+        /**
          * The input stream for reading this socket.
          */
         TcpSocketInputStream* inputStream;
@@ -107,12 +114,17 @@ namespace tcp {
          */
         int trafficClass;
 
+        /**
+         * value of soTimeout used to handle timeout on accept calls.
+         */
+        int soTimeout;
+
     public:
 
         /**
          * Construct a non-connected socket.
-         * @throws SocketException thrown one windows if the static initialization
-         * call to WSAStartup was not successful.
+         *
+         * @throws SocketException thrown if an error occurs while creating the Socket.
          */
         TcpSocket() throw ( decaf::net::SocketException );
 
@@ -169,6 +181,7 @@ namespace tcp {
          */
         virtual void connect( const std::string& hostname, int port, int timeout )
             throw( decaf::io::IOException,
+                   decaf::net::SocketTimeoutException,
                    decaf::lang::exceptions::IllegalArgumentException );
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp Tue Apr 27 00:06:20 2010
@@ -178,9 +178,7 @@ void Socket::bind( const std::string& ip
         ensureCreated();
 
         try {
-            std::cout << "Socket::bind - Binding to " << ipaddress << ":" << port << std::endl;
             this->impl->bind( ipaddress, port );
-            std::cout << "Socket::bind - Bound to " << ipaddress << ":" << port << std::endl;
             this->bound = true;
         } catch( IOException& e ) {
             this->impl->close();
@@ -255,10 +253,8 @@ void Socket::connect( const std::string&
                 this->bound = true;
             }
 
-            std::cout << "Socket::connect - Connecting to " << host << ":" << port << std::endl;
             this->impl->connect( host, port, timeout );
             this->connected = true;
-            std::cout << "Socket::connect - Connected to " << host << ":" << port << std::endl;
 
         } catch( IOException& ex ) {
             this->impl->close();