You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/04/27 02:06:20 UTC
svn commit: r938271 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf:
internal/net/tcp/TcpSocket.cpp internal/net/tcp/TcpSocket.h net/Socket.cpp
Author: tabish
Date: Tue Apr 27 00:06:20 2010
New Revision: 938271
URL: http://svn.apache.org/viewvc?rev=938271&view=rev
Log:
Removes some of the debug code and fixes Socket Connect and ServerSocket Accept so that they behave when a Connect timeout or SoTimeout is set, prior to this small timeout values could result in exceptions that weren't really errors.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp Tue Apr 27 00:06:20 2010
@@ -61,11 +61,16 @@ using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
TcpSocket::TcpSocket() throw ( SocketException )
: socketHandle( NULL ),
+ localAddress( NULL ),
+ remoteAddress( NULL ),
+ pollSet( NULL ),
inputStream( NULL ),
outputStream( NULL ),
inputShutdown( false ),
outputShutdown( false ),
- closed( false ) {
+ closed( false ),
+ trafficClass( 0 ),
+ soTimeout( -1 ) {
}
////////////////////////////////////////////////////////////////////////////////
@@ -102,13 +107,12 @@ void TcpSocket::create() throw( decaf::i
__FILE__, __LINE__, "The System level socket has already been created." );
}
- std::cout << "TcpSocket::create - Creating new Socket instance." << std::endl;
-
// Create the actual socket.
checkResult( apr_socket_create( &socketHandle, AF_INET, SOCK_STREAM,
APR_PROTO_TCP, apr_pool.getAprPool() ) );
- std::cout << "TcpSocket::create - Created new Socket instance." << std::endl;
+ // Create the pollset for the socket.
+ checkResult( apr_pollset_create( &pollSet, 1, apr_pool.getAprPool(), APR_POLLSET_NOCOPY ) );
}
DECAF_CATCH_RETHROW( decaf::io::IOException )
DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -133,25 +137,60 @@ void TcpSocket::accept( SocketImpl* sock
apr_status_t result = APR_SUCCESS;
- std::cout << "TcpSocket::accept - Accepting new Socket instance." << std::endl;
+ if( this->soTimeout == -1 ) {
+
+ // ensure we are in a blocking accept mode.
+ apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
+ apr_socket_timeout_set( socketHandle, -1 );
+
+ } else {
+
+ // ensure we are in a non-blocking accept mode.
+ apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 1 );
+ apr_socket_timeout_set( socketHandle, 0 );
+
+ int num = 0;
+ const apr_pollfd_t* signalled = NULL;
+ apr_pollfd_t pfd = { apr_pool.getAprPool(),
+ APR_POLL_SOCKET,
+ APR_POLLIN | APR_POLLERR,
+ 0, { NULL }, NULL };
+
+ pfd.desc.s = socketHandle;
+
+ // Add the socket to pollset to check APR_POLLOUT(writable)
+ apr_pollset_add( pollSet, &pfd );
+
+ // Poll for the specified timeout, the value in APR is taken as Microseconds.
+ result = apr_pollset_poll( pollSet, soTimeout * 1000, &num, &signalled );
+
+ // Check for async close event.
+ if( closed ) {
+ return;
+ }
+
+ // Remove the socket from the pollset now.
+ apr_pollset_remove( pollSet, &pfd );
+
+ if( result == APR_TIMEUP ) {
+ close();
+ throw SocketTimeoutException(
+ __FILE__, __LINE__,
+ "Timed out while waiting for Socket to Connect." );
+ }
+ }
// Loop to ignore any signal interruptions that occur during the operation.
do {
result = apr_socket_accept( &impl->socketHandle, socketHandle, apr_pool.getAprPool() );
} while( result == APR_EINTR );
- if( result == APR_EAGAIN ) {
- std::cout << "Server Socket Accept indicates it would block." << std::endl;
- }
-
if( result != APR_SUCCESS ) {
throw SocketException(
__FILE__, __LINE__,
"ServerSocket::accept - %s",
SocketError::getErrorString().c_str() );
}
-
- std::cout << "TcpSocket::accept - Accepted new Socket instance." << std::endl;
}
DECAF_CATCH_RETHROW( decaf::io::IOException )
DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -176,10 +215,6 @@ void TcpSocket::bind( const std::string&
const char* host = ipaddress.empty() ? NULL : ipaddress.c_str();
- std::cout << "Attempting to Bind Socket to IPAddress: "
- << ( ipaddress.empty() ? "NULL" : ipaddress )
- << ", on port: " << port << std::endl;
-
// Create the Address Info for the Socket
apr_status_t result = apr_sockaddr_info_get(
&localAddress, host, APR_INET, (apr_port_t)port, 0, apr_pool.getAprPool() );
@@ -215,11 +250,6 @@ void TcpSocket::bind( const std::string&
checkResult( apr_socket_addr_get( &localAddress, APR_LOCAL, socketHandle ) );
this->localPort = localAddress->port;
}
-
- std::cout << "Successfully bound Socket to IPAddress: "
- << this->getLocalAddress()
- << ", on port: "
- << this->getLocalPort() << std::endl;
}
DECAF_CATCH_RETHROW( decaf::io::IOException )
DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -229,6 +259,7 @@ void TcpSocket::bind( const std::string&
////////////////////////////////////////////////////////////////////////////////
void TcpSocket::connect( const std::string& hostname, int port, int timeout )
throw( decaf::io::IOException,
+ decaf::net::SocketTimeoutException,
decaf::lang::exceptions::IllegalArgumentException ) {
try{
@@ -243,49 +274,83 @@ void TcpSocket::connect( const std::stri
__FILE__, __LINE__, "The socket was not yet created." );
}
- std::cout << "TcpSocket::connect - Attempting to aqire address info for IPAddress: "
- << ( hostname.empty() ? "NULL" : hostname )
- << ", on port: " << port << std::endl;
-
// Create the Address data
checkResult( apr_sockaddr_info_get(
&remoteAddress, hostname.c_str(), APR_INET, (apr_port_t)port, 0, apr_pool.getAprPool() ) );
- std::cout << "TcpSocket::connect - Attempting to Connect Socket to IPAddress: "
- << ( hostname.empty() ? "NULL" : hostname )
- << ", on port: " << port << std::endl;
-
- // To make blocking-with-timeout sockets, we have to set it to
- // 'APR_SO_NONBLOCK==1(on) and timeout>0'. On Unix, we have no
- // problem to specify 'APR_SO_NONBLOCK==0(off) and timeout>0'.
- // Unfortunately, we have a problem on Windows. Setting the
- // mode to 'APR_SO_NONBLOCK==0(off) and timeout>0' causes
- // blocking-with-system-timeout sockets on Windows.
- //
- // http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html
-
- // If we have a connection timeout specified, temporarily set the socket to
- // non-blocking so that we can timeout the connect operation. We'll restore
- // to blocking mode right after we connect.
- apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, (timeout > 0 ) ? 1 : 0 );
- apr_socket_timeout_set( socketHandle, timeout );
+ int oldNonblockSetting = 0;
+ apr_interval_time_t oldTimeoutSetting = 0;
- // try to Connect to the provided address.
- checkResult(apr_socket_connect( socketHandle, remoteAddress ));
+ // Record the old settings.
+ apr_socket_opt_get( socketHandle, APR_SO_NONBLOCK, &oldNonblockSetting );
+ apr_socket_timeout_get( socketHandle, &oldTimeoutSetting );
+
+ // Timeout and non-timeout case require very different logic.
+ if( timeout <= 0 ) {
+
+ // Temporarily make it what we want, blocking with no timeout.
+ apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
+ apr_socket_timeout_set( socketHandle, -1 );
- // Now that we are connected, we want to go back to blocking.
- apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 0 );
- apr_socket_timeout_set( socketHandle, -1 );
+ // try to Connect to the provided address.
+ checkResult( apr_socket_connect( socketHandle, remoteAddress ) );
+
+ } else {
+
+ // Temporarily make it what we want, blocking with no timeout.
+ apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, 1 );
+ apr_socket_timeout_set( socketHandle, 0 );
+
+ apr_status_t result = apr_socket_connect( socketHandle, remoteAddress );
+
+ // Special case, it connected, usually doesn't happen.
+ if( result == APR_SUCCESS ) {
+ return;
+ } else if( APR_STATUS_IS_EINPROGRESS( result ) ) {
+
+ int num = 0;
+ const apr_pollfd_t* signalled = NULL;
+ apr_pollfd_t pfd = { apr_pool.getAprPool(), APR_POLL_SOCKET, APR_POLLOUT, 0, { NULL }, NULL };
+
+ pfd.desc.s = socketHandle;
+
+ // Add the socket to pollset to check APR_POLLOUT(writable)
+ apr_pollset_add( pollSet, &pfd );
+
+ // Poll for the specified timeout, the value in APR is taken as Microseconds.
+ result = apr_pollset_poll( pollSet, timeout * 1000, &num, &signalled );
+
+ // Remove the socket from the pollset now.
+ apr_pollset_remove( pollSet, &pfd );
+
+ if( result != APR_SUCCESS || num == 0 ) {
+ close();
+ throw SocketTimeoutException(
+ __FILE__, __LINE__,
+ "Timed out while waiting for Socket to Connect." );
+ }
+
+ } else {
+ close();
+ throw SocketException(
+ __FILE__, __LINE__, "Error while attempting to connect to remote host.",
+ SocketError::getErrorString().c_str() );
+ }
+ }
+
+ // Now that we are connected, we want to go back to old settings.
+ apr_socket_opt_set( socketHandle, APR_SO_NONBLOCK, oldNonblockSetting );
+ apr_socket_timeout_set( socketHandle, oldTimeoutSetting );
// Create an input/output stream for this socket.
inputStream = new TcpSocketInputStream( this );
outputStream = new TcpSocketOutputStream( this );
- std::cout << "TcpSocket::connect - Connected new Socket to IPAddress: "
- << ( hostname.empty() ? "NULL" : hostname )
- << ", on port: " << port << std::endl;
-
- } catch( SocketException& ex ) {
+ } catch( IOException& ex ) {
+ ex.setMark( __FILE__, __LINE__);
+ try{ close(); } catch( lang::Exception& cx){ /* Absorb */ }
+ throw ex;
+ } catch( Exception& ex ) {
ex.setMark( __FILE__, __LINE__);
try{ close(); } catch( lang::Exception& cx){ /* Absorb */ }
throw ex;
@@ -322,8 +387,6 @@ void TcpSocket::listen( int backlog ) th
__FILE__, __LINE__, "The stream is closed" );
}
- std::cout << "TcpSocket::listen - Setting up listen on Socket: backlog = " << backlog << std::endl;
-
// Setup the listen for incoming connection requests
apr_status_t result = apr_socket_listen( socketHandle, backlog );
@@ -333,8 +396,6 @@ void TcpSocket::listen( int backlog ) th
__FILE__, __LINE__, "Error on Bind - %s",
SocketError::getErrorString().c_str() );
}
-
- std::cout << "TcpSocket::listen - Now listening on Socket:" << std::endl;
}
DECAF_CATCH_RETHROW( decaf::io::IOException )
DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -410,6 +471,10 @@ void TcpSocket::close() throw( decaf::io
try{
+ if( this->closed ) {
+ return;
+ }
+
this->closed = true;
// Destroy the input stream.
@@ -429,6 +494,12 @@ void TcpSocket::close() throw( decaf::io
apr_socket_close( socketHandle );
socketHandle = NULL;
}
+
+ // Destroy the pollset
+ if( pollSet != NULL ) {
+ apr_pollset_destroy( pollSet );
+ pollSet = NULL;
+ }
}
DECAF_CATCH_RETHROW( decaf::io::IOException )
DECAF_CATCH_EXCEPTION_CONVERT( Exception, decaf::io::IOException )
@@ -515,6 +586,7 @@ void TcpSocket::setOption( int option, i
if( option == SocketOptions::SOCKET_OPTION_TIMEOUT ) {
// Time in APR for sockets is in microseconds so multiply by 1000.
checkResult( apr_socket_timeout_set( socketHandle, value * 1000 ) );
+ this->soTimeout = value;
return;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.h Tue Apr 27 00:06:20 2010
@@ -25,8 +25,10 @@
#include <decaf/internal/AprPool.h>
#include <apr_network_io.h>
+#include <apr_poll.h>
#include <decaf/io/IOException.h>
+#include <decaf/net/SocketTimeoutException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/IndexOutOfBoundsException.h>
@@ -78,6 +80,11 @@ namespace tcp {
SocketAddress remoteAddress;
/**
+ * APR Pollset used for connect and accept when soTimeout is set.
+ */
+ apr_pollset_t* pollSet;
+
+ /**
* The input stream for reading this socket.
*/
TcpSocketInputStream* inputStream;
@@ -107,12 +114,17 @@ namespace tcp {
*/
int trafficClass;
+ /**
+ * value of soTimeout used to handle timeout on accept calls.
+ */
+ int soTimeout;
+
public:
/**
* Construct a non-connected socket.
- * @throws SocketException thrown one windows if the static initialization
- * call to WSAStartup was not successful.
+ *
+ * @throws SocketException thrown if an error occurs while creating the Socket.
*/
TcpSocket() throw ( decaf::net::SocketException );
@@ -169,6 +181,7 @@ namespace tcp {
*/
virtual void connect( const std::string& hostname, int port, int timeout )
throw( decaf::io::IOException,
+ decaf::net::SocketTimeoutException,
decaf::lang::exceptions::IllegalArgumentException );
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp?rev=938271&r1=938270&r2=938271&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/net/Socket.cpp Tue Apr 27 00:06:20 2010
@@ -178,9 +178,7 @@ void Socket::bind( const std::string& ip
ensureCreated();
try {
- std::cout << "Socket::bind - Binding to " << ipaddress << ":" << port << std::endl;
this->impl->bind( ipaddress, port );
- std::cout << "Socket::bind - Bound to " << ipaddress << ":" << port << std::endl;
this->bound = true;
} catch( IOException& e ) {
this->impl->close();
@@ -255,10 +253,8 @@ void Socket::connect( const std::string&
this->bound = true;
}
- std::cout << "Socket::connect - Connecting to " << host << ":" << port << std::endl;
this->impl->connect( host, port, timeout );
this->connected = true;
- std::cout << "Socket::connect - Connected to " << host << ":" << port << std::endl;
} catch( IOException& ex ) {
this->impl->close();