You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/05/03 01:47:15 UTC

svn commit: r534658 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: network/ transport/filters/

Author: tabish
Date: Wed May  2 16:47:15 2007
New Revision: 534658

URL: http://svn.apache.org/viewvc?view=rev&rev=534658
Log:
http://issues.apache.org/activemq/browse/AMQCPP-104

Adjusted the TCPTransport to close its socket before calling the next close.  
Fixed the Socket Streams to have closed flags that are checked when the socket ops fail.
Changed the close method in TCPSocket to close the streams, but not delete them so that users don't get access violation errors when closing the socket with threads using the streams, streams are deleted in the dtor and in the connect method.


Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.cpp Wed May  2 16:47:15 2007
@@ -17,7 +17,7 @@
 
 #include <activemq/util/Config.h>
 
-#if !defined(HAVE_WINSOCK2_H) 
+#if !defined(HAVE_WINSOCK2_H)
     #include <sys/select.h>
     #include <sys/socket.h>
 #else
@@ -52,27 +52,32 @@
 using namespace std;
 
 ////////////////////////////////////////////////////////////////////////////////
-SocketInputStream::SocketInputStream( network::Socket::SocketHandle socket )
-{
+SocketInputStream::SocketInputStream( network::Socket::SocketHandle socket ) {
+
     this->socket = socket;
-    //debug = true;
+    this->closed = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-SocketInputStream::~SocketInputStream()
-{
+SocketInputStream::~SocketInputStream() {}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketInputStream::close() throw( cms::CMSException ){
+    this->closed = true;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 std::size_t SocketInputStream::available() const throw (activemq::io::IOException){
 
 // The windows version
-#if defined(HAVE_WINSOCK2_H) 
+#if defined(HAVE_WINSOCK2_H)
 
     unsigned long numBytes = 0;
 
-    if (::ioctlsocket (socket, FIONREAD, &numBytes) == SOCKET_ERROR){
-        throw SocketException( __FILE__, __LINE__, "ioctlsocket failed" );
+    if( ::ioctlsocket(socket, FIONREAD, &numBytes ) == SOCKET_ERROR ){
+        throw SocketException(
+            __FILE__, __LINE__,
+            "SocketInputStream::available - ioctlsocket failed" );
     }
 
     return (std::size_t)numBytes;
@@ -81,19 +86,19 @@
 
     // If FIONREAD is defined - use ioctl to find out how many bytes
     // are available.
-	#if defined(FIONREAD)
+    #if defined(FIONREAD)
 
         std::size_t numBytes = 0;
-	    if( ::ioctl (socket, FIONREAD, &numBytes) != -1 ){
+        if( ::ioctl (socket, FIONREAD, &numBytes) != -1 ){
             return numBytes;
         }
-         
-	#endif
-	
-	// If we didn't get anything we can use select.  This is a little
+
+    #endif
+
+    // If we didn't get anything we can use select.  This is a little
     // less functional.  We will poll on the socket - if there is data
     // available, we'll return 1, otherwise we'll return zero.
-	#if defined(HAVE_SELECT)    
+    #if defined(HAVE_SELECT)
 
         fd_set rd;
         FD_ZERO(&rd);
@@ -106,56 +111,56 @@
             throw IOException(__FILE__, __LINE__, SocketError::getErrorString().c_str() );
         }
         return (returnCode == 0)? 0 : 1;
-        
+
     #else
-    
+
         return 0;
-        
-	#endif /* HAVE_SELECT */
-	
+
+    #endif /* HAVE_SELECT */
+
 
 #endif // !defined(HAVE_WINSOCK2_H)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 unsigned char SocketInputStream::read() throw (IOException){
-   
-    unsigned char c;  
+
+    unsigned char c;
     std::size_t len = read( &c, 1 );
-    if( len != sizeof(c) ){
-        throw IOException( __FILE__, __LINE__, 
+    if( len != sizeof(c) && !closed ){
+        throw IOException( __FILE__, __LINE__,
             "activemq::io::SocketInputStream::read - failed reading a byte");
     }
-   
+
     return c;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::size_t SocketInputStream::read( unsigned char* buffer, 
+std::size_t SocketInputStream::read( unsigned char* buffer,
                                      std::size_t bufferSize ) throw (IOException)
 {
     int len = 0;
-    
-    // Loop to ignore any signal interruptions that occur during the read.  
+
+    // Loop to ignore any signal interruptions that occur during the read.
     do {
-        
+
         // Read data from the socket.
         len = ::recv(socket, (char*)buffer, (int)bufferSize, 0);
-    
+
         // Check for a closed socket.
-        if( len == 0 ){
-            throw IOException( __FILE__, __LINE__, 
+        if( len == 0 && !closed ){
+            throw IOException( __FILE__, __LINE__,
                 "activemq::io::SocketInputStream::read - The connection is broken" );
         }
-            
-    } while( len == -1 && 
+
+    } while( !closed && len == -1 &&
              SocketError::getErrorCode() == SocketError::INTERRUPTED );
-        
+
     // Check for error.
-    if( len == -1 ){
-        
+    if( len == -1 && !closed ){
+
         // Otherwise, this was a bad error - throw an exception.
-        throw IOException( __FILE__, __LINE__, 
+        throw IOException( __FILE__, __LINE__,
                 "activemq::io::SocketInputStream::read - %s", SocketError::getErrorString().c_str() );
     }
 
@@ -163,9 +168,11 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::size_t SocketInputStream::skip( std::size_t num AMQCPP_UNUSED ) 
-throw ( io::IOException, exceptions::UnsupportedOperationException ) {
-    throw exceptions::UnsupportedOperationException(__FILE__, __LINE__, 
-        "skip() method is not supported"); 
+std::size_t SocketInputStream::skip( std::size_t num AMQCPP_UNUSED )
+    throw ( io::IOException, exceptions::UnsupportedOperationException ) {
+
+    throw exceptions::UnsupportedOperationException(
+        __FILE__, __LINE__,
+        "SocketInputStream::skip - skip() method is not supported");
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketInputStream.h Wed May  2 16:47:15 2007
@@ -17,40 +17,40 @@
 
 #ifndef ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
 #define ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_
- 
+
 #include <activemq/io/InputStream.h>
 #include <activemq/network/Socket.h>
 #include <activemq/concurrent/Mutex.h>
 
 namespace activemq{
 namespace network{
-    
+
     /**
      * Input stream for performing reads on a socket.  This
      * class will only work properly for blocking sockets.
      */
-	class SocketInputStream : public io::InputStream
-	{
-	private:
-	
-		// The socket handle.
-		Socket::SocketHandle socket;
-		concurrent::Mutex mutex;
-        //bool debug;
-		
-	public:
-	
-		/**
-		 * Constructor.
-		 * @param socket the socket handle.
-		 */
-		SocketInputStream( Socket::SocketHandle socket );
-		
-		/**
-		 * Destructor.
-		 */
-		virtual ~SocketInputStream();
-		
+    class SocketInputStream : public io::InputStream
+    {
+    private:
+
+        // The socket handle.
+        Socket::SocketHandle socket;
+        concurrent::Mutex mutex;
+        bool closed;
+
+    public:
+
+        /**
+         * Constructor.
+         * @param socket the socket handle.
+         */
+        SocketInputStream( Socket::SocketHandle socket );
+
+        /**
+         * Destructor.
+         */
+        virtual ~SocketInputStream();
+
         /**
          * Enables socket level output of the recieved data
          * @param debug true to turn on debugging
@@ -58,7 +58,7 @@
         /*virtual void setDebug( bool debug ){
             this->debug = debug;
         }*/
-        
+
         /**
          * Locks the object.
          * @throws ActiveMQException
@@ -66,15 +66,15 @@
         virtual void lock() throw( exceptions::ActiveMQException ){
             mutex.lock();
         }
-   
+
         /**
          * Unlocks the object.
          * @throws ActiveMQException
          */
-        virtual void unlock() throw( exceptions::ActiveMQException ){   
+        virtual void unlock() throw( exceptions::ActiveMQException ){
             mutex.unlock();
         }
-       
+
         /**
          * Waits on a signal from this object, which is generated
          * by a call to Notify.  Must have this object locked before
@@ -84,7 +84,7 @@
         virtual void wait() throw( exceptions::ActiveMQException ){
             mutex.wait();
         }
-    
+
         /**
          * Waits on a signal from this object, which is generated
          * by a call to Notify.  Must have this object locked before
@@ -93,9 +93,9 @@
          * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
          * @throws ActiveMQException
          */
-        virtual void wait( unsigned long millisecs ) 
+        virtual void wait( unsigned long millisecs )
             throw( exceptions::ActiveMQException ) {
-         
+
             mutex.wait( millisecs );
         }
 
@@ -108,7 +108,7 @@
         virtual void notify() throw( exceptions::ActiveMQException ){
             mutex.notify();
         }
-        
+
         /**
          * Signals the waiters on this object that it can now wake
          * up and continue.  Must have this object locked before
@@ -118,52 +118,52 @@
         virtual void notifyAll() throw( exceptions::ActiveMQException ){
             mutex.notifyAll();
         }
-	    
-	    /**
-	     * Returns the number of bytes available on the socket to
+
+        /**
+         * Returns the number of bytes available on the socket to
          * be read right now.
-	     * @return The number of bytes currently available to
+         * @return The number of bytes currently available to
          * be read on the socket.
-	     */
-		virtual std::size_t available() const throw (activemq::io::IOException);
-		
-		/**
-		 * Reads a single byte from the buffer.  If no data
+         */
+        virtual std::size_t available() const throw (activemq::io::IOException);
+
+        /**
+         * Reads a single byte from the buffer.  If no data
          * is available, blocks until their is.
-		 * @return The next byte.
-		 * @throws IOException thrown if an error occurs.
-		 */
-		virtual unsigned char read() throw ( io::IOException );
-		
-		/**
-		 * Reads an array of bytes from the buffer.  If no data
+         * @return The next byte.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual unsigned char read() throw ( io::IOException );
+
+        /**
+         * Reads an array of bytes from the buffer.  If no data
          * is available, blocks until there is.
-		 * @param buffer (out) the target buffer.
-		 * @param bufferSize the size of the output buffer.
-		 * @return The number of bytes read.
-		 * @throws IOException thrown if an error occurs.
-		 */
-		virtual std::size_t read( unsigned char* buffer, 
-                                  std::size_t bufferSize ) 
+         * @param buffer (out) the target buffer.
+         * @param bufferSize the size of the output buffer.
+         * @return The number of bytes read.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual std::size_t read( unsigned char* buffer,
+                                  std::size_t bufferSize )
             throw (io::IOException);
-		
-		/**
-		 * Close - does nothing.  It is the responsibility of the owner
-		 * of the socket object to close it.
+
+        /**
+         * Close - does nothing.  It is the responsibility of the owner
+         * of the socket object to close it.
          * @throws CMSException
-		 */
-		virtual void close() throw( cms::CMSException ){}
-        
+         */
+        virtual void close() throw( cms::CMSException );
+
         /**
          * Not supported.
          * @throws an UnsupportedOperationException.
-         */ 
-        virtual std::size_t skip( std::size_t num ) 
-            throw ( io::IOException, 
+         */
+        virtual std::size_t skip( std::size_t num )
+            throw ( io::IOException,
                     exceptions::UnsupportedOperationException );
-        
-	};
-	
+
+    };
+
 }}
 
 #endif /*ACTIVEMQ_NETWORK_SOCKETINPUTSTREAM_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.cpp Wed May  2 16:47:15 2007
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "SocketOutputStream.h"
 #include <activemq/util/Config.h>
 #include <activemq/util/Character.h>
@@ -48,12 +48,16 @@
 SocketOutputStream::SocketOutputStream( Socket::SocketHandle socket )
 {
     this->socket = socket;
-    //this->debug = true;
+    this->closed = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-SocketOutputStream::~SocketOutputStream()
-{
+SocketOutputStream::~SocketOutputStream() {
+    close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SocketOutputStream::close() throw( cms::CMSException ) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -63,20 +67,20 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void SocketOutputStream::write( const unsigned char* buffer, std::size_t len ) 
+void SocketOutputStream::write( const unsigned char* buffer, std::size_t len )
     throw (IOException)
 {
     std::size_t remaining = len;
     int sendOpts = AMQ_SEND_OPTS;
 
-    while( remaining > 0 )
+    while( remaining > 0 && !closed )
     {
-        int length = ::send( socket, (const char*)buffer, (int)remaining, sendOpts );      	
-        if( length == -1 ){
-            throw IOException( __FILE__, __LINE__, 
+        int length = ::send( socket, (const char*)buffer, (int)remaining, sendOpts );
+        if( length == -1 && !closed ){
+            throw IOException( __FILE__, __LINE__,
                 "activemq::io::SocketOutputStream::write - %s", SocketError::getErrorString().c_str() );
         }
-         
+
         buffer+=length;
         remaining -= length;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/SocketOutputStream.h Wed May  2 16:47:15 2007
@@ -17,14 +17,14 @@
 
 #ifndef ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
 #define ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_
- 
+
 #include <activemq/io/OutputStream.h>
 #include <activemq/network/Socket.h>
 #include <activemq/concurrent/Mutex.h>
 
 namespace activemq{
 namespace network{
-      
+
     /**
      * Output stream for performing write operations
      * on a socket.
@@ -32,46 +32,38 @@
     class SocketOutputStream : public io::OutputStream
     {
     private:
-   
+
         // The socket.
         Socket::SocketHandle socket;
         concurrent::Mutex mutex;
-        //bool debug;
-      
+        bool closed;
+
     public:
-   
+
         /**
          * Constructor.
          * @param socket the socket handle.
          */
         SocketOutputStream( Socket::SocketHandle socket );
-      
+
         virtual ~SocketOutputStream();
 
         /**
-         * Enables Debugging of Socket Data
-         * @param debug true to enable
-         */      
-        /*virtual void setDebug( bool debug ){
-            this->debug = debug;
-        }*/
-      
-        /**
          * Locks the object.
          * @throws ActiveMQException
          */
         virtual void lock() throw( exceptions::ActiveMQException ){
             mutex.lock();
         }
-   
+
         /**
          * Unlocks the object.
          * @throws ActiveMQException
          */
-        virtual void unlock() throw( exceptions::ActiveMQException ){   
+        virtual void unlock() throw( exceptions::ActiveMQException ){
             mutex.unlock();
         }
-       
+
         /**
          * Waits on a signal from this object, which is generated
          * by a call to Notify.  Must have this object locked before
@@ -81,7 +73,7 @@
         virtual void wait() throw( exceptions::ActiveMQException ){
             mutex.wait();
         }
-    
+
         /**
          * Waits on a signal from this object, which is generated
          * by a call to Notify.  Must have this object locked before
@@ -90,9 +82,9 @@
          * @param millisecs time in millisecsonds to wait, or WAIT_INIFINITE
          * @throws ActiveMQException
          */
-        virtual void wait( unsigned long millisecs ) 
+        virtual void wait( unsigned long millisecs )
             throw( exceptions::ActiveMQException ) {
-         
+
             mutex.wait( millisecs );
         }
 
@@ -105,7 +97,7 @@
         virtual void notify() throw( exceptions::ActiveMQException ){
             mutex.notify();
         }
-        
+
         /**
          * Signals the waiters on this object that it can now wake
          * up and continue.  Must have this object locked before
@@ -114,38 +106,38 @@
         virtual void notifyAll() throw( exceptions::ActiveMQException ){
             mutex.notifyAll();
          }
-       
+
         /**
          * Writes a single byte to the output stream.
          * @param c the byte.
          * @throws IOException thrown if an error occurs.
          */
         virtual void write( unsigned char c ) throw ( io::IOException );
-      
+
         /**
          * Writes an array of bytes to the output stream.
          * @param buffer The array of bytes to write.
          * @param len The number of bytes from the buffer to be written.
          * @throws IOException thrown if an error occurs.
          */
-        virtual void write( const unsigned char* buffer, 
+        virtual void write( const unsigned char* buffer,
                             std::size_t len ) throw ( io::IOException );
-      
+
         /**
          * Flush - does nothing.
          * @throws IOException
          */
         virtual void flush() throw ( io::IOException ){};
-      
+
         /**
          * Close - does nothing.  It is the responsibility of the owner
          * of the socket object to close it.
          * @throws CMSException
          */
-        virtual void close() throw( cms::CMSException ){} 
-        
+        virtual void close() throw( cms::CMSException );
+
     };
-   
+
 }}
 
 #endif /*ACTIVEMQ_NETWORK_SOCKETOUTPUTSTREAM_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/network/TcpSocket.cpp Wed May  2 16:47:15 2007
@@ -123,6 +123,18 @@
 {
     // No shutdown, just close - dont want blocking destructor.
     close();
+
+    // Destroy the input stream.
+    if( inputStream != NULL ){
+        delete inputStream;
+        inputStream = NULL;
+    }
+
+    // Destroy the output stream.
+    if( outputStream != NULL ){
+        delete outputStream;
+        outputStream = NULL;
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -149,7 +161,7 @@
         checkResult( (int)(socketHandle = ::socket(AF_INET, SOCK_STREAM, 0)) );
 
         // Check port value.
-        if (port <= 0 || port > 65535) {
+        if( port <= 0 || port > 65535 ) {
             close();
             throw SocketException ( __FILE__, __LINE__,
                 "Socket::connect- Port out of range: %d", port );
@@ -171,7 +183,7 @@
         // Resolve name
 #if defined(HAVE_STRUCT_ADDRINFO)
         addrinfo hints;
-        memset(&hints, 0, sizeof(addrinfo));
+        memset( &hints, 0, sizeof(addrinfo) );
         hints.ai_family = PF_INET;
         struct addrinfo *res_ptr = NULL;
 
@@ -196,10 +208,21 @@
                             ( const sockaddr * )&target_addr,
                             sizeof( target_addr ) ) );
 
+        // Destroy the input stream.
+        if( inputStream != NULL ){
+            delete inputStream;
+            inputStream = NULL;
+        }
+
+        // Destroy the output stream.
+        if( outputStream != NULL ){
+            delete outputStream;
+            outputStream = NULL;
+        }
+
         // Create an input/output stream for this socket.
         inputStream = new SocketInputStream( socketHandle );
         outputStream = new SocketOutputStream( socketHandle );
-
     }
     catch( SocketException& ex ) {
         ex.setMark( __FILE__, __LINE__);
@@ -215,16 +238,14 @@
 ////////////////////////////////////////////////////////////////////////////////
 void TcpSocket::close() throw( cms::CMSException )
 {
-    // Destroy the input stream.
+    // Close the input stream.
     if( inputStream != NULL ){
-        delete inputStream;
-        inputStream = NULL;
+        inputStream->close();
     }
 
-    // Destroy the output stream.
+    // Close the output stream.
     if( outputStream != NULL ){
-        delete outputStream;
-        outputStream = NULL;
+        outputStream->close();
     }
 
     if( isConnected() )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp?view=diff&rev=534658&r1=534657&r2=534658
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/filters/TcpTransport.cpp Wed May  2 16:47:15 2007
@@ -134,13 +134,13 @@
 
     try
     {
-        // Invoke the paren't close first.
-        TransportFilter::close();
-
         // Close the socket.
         if( socket != NULL ) {
             socket->close();
         }
+
+        // Invoke the paren't close first.
+        TransportFilter::close();
     }
     AMQ_CATCH_RETHROW( SocketException )
     AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, SocketException )