You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/03 23:23:42 UTC

svn commit: r1393773 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport: IOTransport.cpp IOTransport.h

Author: tabish
Date: Wed Oct  3 21:23:41 2012
New Revision: 1393773

URL: http://svn.apache.org/viewvc?rev=1393773&view=rev
Log:
Polish the code and ensure that all close methods for members gets called and data is freed before throwing IOException in close()

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1393773&r1=1393772&r2=1393773&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Wed Oct  3 21:23:41 2012
@@ -32,150 +32,130 @@ using namespace activemq::wireformat;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport" )
+LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport")
 
 ////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport() : wireFormat(),
-                             listener(NULL),
-                             inputStream(NULL),
-                             outputStream(NULL),
-                             thread(),
-                             closed(false) {
+IOTransport::IOTransport() :
+    wireFormat(), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport( const Pointer<WireFormat>& wireFormat ) : wireFormat(wireFormat),
-                                                                    listener(NULL),
-                                                                    inputStream(NULL),
-                                                                    outputStream(NULL),
-                                                                    thread(),
-                                                                    closed(false) {
+IOTransport::IOTransport(const Pointer<WireFormat>& wireFormat) :
+    wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-IOTransport::~IOTransport(){
-    try{
+IOTransport::~IOTransport() {
+    try {
         close();
     }
     AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void IOTransport::fire( decaf::lang::Exception& ex ){
-
-    if( this->listener != NULL && !this->closed ){
+void IOTransport::fire(decaf::lang::Exception& ex) {
 
-        try{
-            this->listener->onException( ex );
-        }catch( ... ){}
+    if (this->listener != NULL && !this->closed) {
+        try {
+            this->listener->onException(ex);
+        } catch (...) {
+        }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void IOTransport::fire( const Pointer<Command>& command ){
+void IOTransport::fire(const Pointer<Command>& command) {
 
-    try{
-        // Since the listener is responsible for freeing the memory,
-        // if there is no listener - free the command here.  Also if
-        // we have been closed then we don't deliver any messages that
+    try {
+        // If we have been closed then we don't deliver any messages that
         // might have sneaked in while we where closing.
-        if( this->listener == NULL || this->closed == true ){
+        if (this->listener == NULL || this->closed == true) {
             return;
         }
 
-        this->listener->onCommand( command );
+        this->listener->onCommand(command);
 
     }
     AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void IOTransport::oneway( const Pointer<Command>& command ) {
+void IOTransport::oneway(const Pointer<Command>& command) {
 
-    try{
+    try {
 
-        if( closed ){
-            throw IOException( __FILE__, __LINE__,
-                "IOTransport::oneway() - transport is closed!" );
+        if (closed) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is closed!");
         }
 
         // Make sure the thread has been started.
-        if( thread == NULL ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "IOTransport::oneway() - transport is not started" );
+        if (thread == NULL) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is not started");
         }
 
         // Make sure the command object is valid.
-        if( command == NULL ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "IOTransport::oneway() - attempting to write NULL command" );
+        if (command == NULL) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - attempting to write NULL command");
         }
 
         // Make sure we have an output stream to write to.
-        if( outputStream == NULL ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "IOTransport::oneway() - invalid output stream" );
+        if (outputStream == NULL) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - invalid output stream");
         }
 
-        synchronized( outputStream ){
+        synchronized(outputStream) {
             // Write the command to the output stream.
-            this->wireFormat->marshal( command, this, this->outputStream );
+            this->wireFormat->marshal(command, this, this->outputStream);
             this->outputStream->flush();
         }
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::start() {
 
-    try{
+    try {
 
         // Can't restart a closed transport.
-        if( closed ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "IOTransport::start() - transport is already closed - cannot restart" );
+        if (closed) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart");
         }
 
         // If it's already started, do nothing.
-        if( thread != NULL ){
+        if (thread != NULL) {
             return;
         }
 
         // Make sure all variables that we need have been set.
-        if( inputStream == NULL || outputStream == NULL || wireFormat.get() == NULL ){
-            throw IOException(
-                __FILE__, __LINE__,
-                "IOTransport::start() - "
-                "IO streams and wireFormat instances must be set before calling start" );
+        if (inputStream == NULL || outputStream == NULL || wireFormat.get() == NULL) {
+            throw IOException(__FILE__, __LINE__, "IOTransport::start() - "
+                    "IO streams and wireFormat instances must be set before calling start");
         }
 
         // Start the polling thread.
-        thread.reset( new Thread( this ) );
+        thread.reset(new Thread(this));
         thread->start();
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::stop() {
 
-    try{
+    try {
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW( IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+    AMQ_CATCHALL_THROW( IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -188,7 +168,9 @@ void IOTransport::close() {
 
     public:
 
-        Finalizer(Pointer<Thread> target) : target(target) {}
+        Finalizer(Pointer<Thread> target) :
+                target(target) {
+        }
         ~Finalizer() {
             try {
                 target->join();
@@ -204,7 +186,6 @@ void IOTransport::close() {
             return;
         }
 
-
         // Mark this transport as closed.
         closed = true;
 
@@ -213,75 +194,82 @@ void IOTransport::close() {
         // No need to fire anymore async events now.
         this->listener = NULL;
 
+        IOException error;
+        bool hasException = false;
+
         // We have to close the input stream before we stop the thread.  this will
         // force us to wake up the thread if it's stuck in a read (which is likely).
         // Otherwise, the join that follows will block forever.
-        if (inputStream != NULL) {
-            inputStream->close();
-            inputStream = NULL;
+        try {
+            if (inputStream != NULL) {
+                inputStream->close();
+                inputStream = NULL;
+            }
+        } catch (IOException& ex) {
+            error = ex;
+            error.setMark(__FILE__, __LINE__);
+            hasException = true;
         }
 
-        // Close the output stream.
-        if (outputStream != NULL) {
-            outputStream->close();
-            outputStream = NULL;
+        try {
+            // Close the output stream.
+            if (outputStream != NULL) {
+                outputStream->close();
+                outputStream = NULL;
+            }
+        } catch (IOException& ex) {
+            if (!hasException) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
         }
 
         // Clear the WireFormat so we can't use it anymore
         this->wireFormat.reset(NULL);
+
+        if (hasException) {
+            throw error;
+        }
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW( IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+    AMQ_CATCHALL_THROW( IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::run() {
 
-    try{
+    try {
 
-        while( !closed ){
+        while (!closed) {
 
             // Read the next command from the input stream.
-            Pointer<Command> command( wireFormat->unmarshal( this, this->inputStream ) );
+            Pointer<Command> command(wireFormat->unmarshal(this, this->inputStream));
 
             // Notify the listener.
-            fire( command );
+            fire(command);
         }
-    }
-    catch( exceptions::ActiveMQException& ex ){
-        ex.setMark( __FILE__, __LINE__ );
-        fire( ex );
-    }
-    catch( decaf::lang::Exception& ex ){
-        exceptions::ActiveMQException exl( ex );
-        exl.setMark( __FILE__, __LINE__ );
-        fire( exl );
-    }
-    catch( ... ){
-
-        exceptions::ActiveMQException ex(
-            __FILE__, __LINE__,
-            "IOTransport::run - caught unknown exception" );
-
-        LOGDECAF_WARN(logger, ex.getStackTraceString() );
-
-        fire( ex );
+    } catch (exceptions::ActiveMQException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        fire(ex);
+    } catch (decaf::lang::Exception& ex) {
+        exceptions::ActiveMQException exl(ex);
+        exl.setMark(__FILE__, __LINE__);
+        fire(exl);
+    } catch (...) {
+        exceptions::ActiveMQException ex(__FILE__, __LINE__, "IOTransport::run - caught unknown exception");
+        LOGDECAF_WARN(logger, ex.getStackTraceString());
+        fire(ex);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request( const Pointer<Command>& command AMQCPP_UNUSED ) {
-
-    throw decaf::lang::exceptions::UnsupportedOperationException(
-        __FILE__, __LINE__,
-        "IOTransport::request() - unsupported operation" );
+Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED) {
+    throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request( const Pointer<Command>& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED ) {
-
-    throw decaf::lang::exceptions::UnsupportedOperationException(
-        __FILE__, __LINE__,
-        "IOTransport::request() - unsupported operation" );
+Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
+    throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1393773&r1=1393772&r2=1393773&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Wed Oct  3 21:23:41 2012
@@ -33,8 +33,8 @@
 #include <decaf/util/logging/LoggerDefines.h>
 #include <memory>
 
-namespace activemq{
-namespace transport{
+namespace activemq {
+namespace transport {
 
     using decaf::lang::Pointer;
     using activemq::commands::Command;
@@ -55,7 +55,7 @@ namespace transport{
     class AMQCPP_API IOTransport : public Transport,
                                    public decaf::lang::Runnable {
 
-        LOGDECAF_DECLARE( logger )
+        LOGDECAF_DECLARE(logger)
 
     private:
 
@@ -91,8 +91,8 @@ namespace transport{
 
     private:
 
-        IOTransport( const IOTransport& );
-        IOTransport& operator= ( const IOTransport& );
+        IOTransport(const IOTransport&);
+        IOTransport& operator=(const IOTransport&);
 
     private:
 
@@ -100,13 +100,13 @@ namespace transport{
          * Notify the exception listener
          * @param ex the exception to send
          */
-        void fire( decaf::lang::Exception& ex );
+        void fire(decaf::lang::Exception& ex);
 
         /**
          * Notify the command listener.
          * @param command the command the send
          */
-        void fire( const Pointer<Command>& command );
+        void fire(const Pointer<Command>& command);
 
     public:
 
@@ -122,7 +122,7 @@ namespace transport{
          * @param wireFormat
          *        Data encoder / decoder to use when reading and writing.
          */
-        IOTransport( const Pointer<wireformat::WireFormat>& wireFormat );
+        IOTransport(const Pointer<wireformat::WireFormat>& wireFormat);
 
         virtual ~IOTransport();
 
@@ -132,7 +132,7 @@ namespace transport{
          * @param is
          *      The InputStream that will be read from by this object.
          */
-        virtual void setInputStream( decaf::io::DataInputStream* is ) {
+        virtual void setInputStream(decaf::io::DataInputStream* is) {
             this->inputStream = is;
         }
 
@@ -142,37 +142,37 @@ namespace transport{
          * @param os
          *      The OuputStream that will be written to by this object.
          */
-        virtual void setOutputStream( decaf::io::DataOutputStream* os ) {
+        virtual void setOutputStream(decaf::io::DataOutputStream* os) {
             this->outputStream = os;
         }
 
-    public:  //Transport methods
+    public:  // Transport methods
 
-        virtual void oneway( const Pointer<Command>& command );
+        virtual void oneway(const Pointer<Command>& command);
 
         /**
          * {@inheritDoc}
          *
          * This method always thrown an UnsupportedOperationException.
          */
-        virtual Pointer<Response> request( const Pointer<Command>& command );
+        virtual Pointer<Response> request(const Pointer<Command>& command);
 
         /**
          * {@inheritDoc}
          *
          * This method always thrown an UnsupportedOperationException.
          */
-        virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+        virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
 
         virtual Pointer<wireformat::WireFormat> getWireFormat() const {
-        	return this->wireFormat;
+            return this->wireFormat;
         }
 
-        virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ){
+        virtual void setWireFormat(const Pointer<wireformat::WireFormat>& wireFormat) {
             this->wireFormat = wireFormat;
         }
 
-        virtual void setTransportListener( TransportListener* listener ){
+        virtual void setTransportListener(TransportListener* listener) {
             this->listener = listener;
         }
 
@@ -186,8 +186,8 @@ namespace transport{
 
         virtual void close();
 
-        virtual Transport* narrow( const std::type_info& typeId ) {
-            if( typeid( *this ) == typeId ) {
+        virtual Transport* narrow(const std::type_info& typeId) {
+            if (typeid(*this) == typeId) {
                 return this;
             }
 
@@ -218,8 +218,7 @@ namespace transport{
             return false;
         }
 
-        virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
-                                 const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+        virtual void updateURIs(bool rebalance AMQCPP_UNUSED, const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED) {
             throw decaf::io::IOException();
         }
 
@@ -228,7 +227,7 @@ namespace transport{
          *
          * This method does nothing in this subclass.
          */
-        virtual void reconnect( const decaf::net::URI& uri AMQCPP_UNUSED ) {}
+        virtual void reconnect(const decaf::net::URI& uri AMQCPP_UNUSED) {}
 
     public:  // Runnable methods.