You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/03 23:23:42 UTC
svn commit: r1393773 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport:
IOTransport.cpp IOTransport.h
Author: tabish
Date: Wed Oct 3 21:23:41 2012
New Revision: 1393773
URL: http://svn.apache.org/viewvc?rev=1393773&view=rev
Log:
Polish the code and ensure that all close methods for members gets called and data is freed before throwing IOException in close()
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1393773&r1=1393772&r2=1393773&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Wed Oct 3 21:23:41 2012
@@ -32,150 +32,130 @@ using namespace activemq::wireformat;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
-LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport" )
+LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport")
////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport() : wireFormat(),
- listener(NULL),
- inputStream(NULL),
- outputStream(NULL),
- thread(),
- closed(false) {
+IOTransport::IOTransport() :
+ wireFormat(), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
}
////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport( const Pointer<WireFormat>& wireFormat ) : wireFormat(wireFormat),
- listener(NULL),
- inputStream(NULL),
- outputStream(NULL),
- thread(),
- closed(false) {
+IOTransport::IOTransport(const Pointer<WireFormat>& wireFormat) :
+ wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
}
////////////////////////////////////////////////////////////////////////////////
-IOTransport::~IOTransport(){
- try{
+IOTransport::~IOTransport() {
+ try {
close();
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::fire( decaf::lang::Exception& ex ){
-
- if( this->listener != NULL && !this->closed ){
+void IOTransport::fire(decaf::lang::Exception& ex) {
- try{
- this->listener->onException( ex );
- }catch( ... ){}
+ if (this->listener != NULL && !this->closed) {
+ try {
+ this->listener->onException(ex);
+ } catch (...) {
+ }
}
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::fire( const Pointer<Command>& command ){
+void IOTransport::fire(const Pointer<Command>& command) {
- try{
- // Since the listener is responsible for freeing the memory,
- // if there is no listener - free the command here. Also if
- // we have been closed then we don't deliver any messages that
+ try {
+ // If we have been closed then we don't deliver any messages that
// might have sneaked in while we where closing.
- if( this->listener == NULL || this->closed == true ){
+ if (this->listener == NULL || this->closed == true) {
return;
}
- this->listener->onCommand( command );
+ this->listener->onCommand(command);
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::oneway( const Pointer<Command>& command ) {
+void IOTransport::oneway(const Pointer<Command>& command) {
- try{
+ try {
- if( closed ){
- throw IOException( __FILE__, __LINE__,
- "IOTransport::oneway() - transport is closed!" );
+ if (closed) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is closed!");
}
// Make sure the thread has been started.
- if( thread == NULL ){
- throw IOException(
- __FILE__, __LINE__,
- "IOTransport::oneway() - transport is not started" );
+ if (thread == NULL) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is not started");
}
// Make sure the command object is valid.
- if( command == NULL ){
- throw IOException(
- __FILE__, __LINE__,
- "IOTransport::oneway() - attempting to write NULL command" );
+ if (command == NULL) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - attempting to write NULL command");
}
// Make sure we have an output stream to write to.
- if( outputStream == NULL ){
- throw IOException(
- __FILE__, __LINE__,
- "IOTransport::oneway() - invalid output stream" );
+ if (outputStream == NULL) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - invalid output stream");
}
- synchronized( outputStream ){
+ synchronized(outputStream) {
// Write the command to the output stream.
- this->wireFormat->marshal( command, this, this->outputStream );
+ this->wireFormat->marshal(command, this, this->outputStream);
this->outputStream->flush();
}
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::start() {
- try{
+ try {
// Can't restart a closed transport.
- if( closed ){
- throw IOException(
- __FILE__, __LINE__,
- "IOTransport::start() - transport is already closed - cannot restart" );
+ if (closed) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart");
}
// If it's already started, do nothing.
- if( thread != NULL ){
+ if (thread != NULL) {
return;
}
// Make sure all variables that we need have been set.
- if( inputStream == NULL || outputStream == NULL || wireFormat.get() == NULL ){
- throw IOException(
- __FILE__, __LINE__,
- "IOTransport::start() - "
- "IO streams and wireFormat instances must be set before calling start" );
+ if (inputStream == NULL || outputStream == NULL || wireFormat.get() == NULL) {
+ throw IOException(__FILE__, __LINE__, "IOTransport::start() - "
+ "IO streams and wireFormat instances must be set before calling start");
}
// Start the polling thread.
- thread.reset( new Thread( this ) );
+ thread.reset(new Thread(this));
thread->start();
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::stop() {
- try{
+ try {
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -188,7 +168,9 @@ void IOTransport::close() {
public:
- Finalizer(Pointer<Thread> target) : target(target) {}
+ Finalizer(Pointer<Thread> target) :
+ target(target) {
+ }
~Finalizer() {
try {
target->join();
@@ -204,7 +186,6 @@ void IOTransport::close() {
return;
}
-
// Mark this transport as closed.
closed = true;
@@ -213,75 +194,82 @@ void IOTransport::close() {
// No need to fire anymore async events now.
this->listener = NULL;
+ IOException error;
+ bool hasException = false;
+
// We have to close the input stream before we stop the thread. this will
// force us to wake up the thread if it's stuck in a read (which is likely).
// Otherwise, the join that follows will block forever.
- if (inputStream != NULL) {
- inputStream->close();
- inputStream = NULL;
+ try {
+ if (inputStream != NULL) {
+ inputStream->close();
+ inputStream = NULL;
+ }
+ } catch (IOException& ex) {
+ error = ex;
+ error.setMark(__FILE__, __LINE__);
+ hasException = true;
}
- // Close the output stream.
- if (outputStream != NULL) {
- outputStream->close();
- outputStream = NULL;
+ try {
+ // Close the output stream.
+ if (outputStream != NULL) {
+ outputStream->close();
+ outputStream = NULL;
+ }
+ } catch (IOException& ex) {
+ if (!hasException) {
+ error = ex;
+ error.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
}
// Clear the WireFormat so we can't use it anymore
this->wireFormat.reset(NULL);
+
+ if (hasException) {
+ throw error;
+ }
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
void IOTransport::run() {
- try{
+ try {
- while( !closed ){
+ while (!closed) {
// Read the next command from the input stream.
- Pointer<Command> command( wireFormat->unmarshal( this, this->inputStream ) );
+ Pointer<Command> command(wireFormat->unmarshal(this, this->inputStream));
// Notify the listener.
- fire( command );
+ fire(command);
}
- }
- catch( exceptions::ActiveMQException& ex ){
- ex.setMark( __FILE__, __LINE__ );
- fire( ex );
- }
- catch( decaf::lang::Exception& ex ){
- exceptions::ActiveMQException exl( ex );
- exl.setMark( __FILE__, __LINE__ );
- fire( exl );
- }
- catch( ... ){
-
- exceptions::ActiveMQException ex(
- __FILE__, __LINE__,
- "IOTransport::run - caught unknown exception" );
-
- LOGDECAF_WARN(logger, ex.getStackTraceString() );
-
- fire( ex );
+ } catch (exceptions::ActiveMQException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ fire(ex);
+ } catch (decaf::lang::Exception& ex) {
+ exceptions::ActiveMQException exl(ex);
+ exl.setMark(__FILE__, __LINE__);
+ fire(exl);
+ } catch (...) {
+ exceptions::ActiveMQException ex(__FILE__, __LINE__, "IOTransport::run - caught unknown exception");
+ LOGDECAF_WARN(logger, ex.getStackTraceString());
+ fire(ex);
}
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request( const Pointer<Command>& command AMQCPP_UNUSED ) {
-
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__, __LINE__,
- "IOTransport::request() - unsupported operation" );
+Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED) {
+ throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request( const Pointer<Command>& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED ) {
-
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__, __LINE__,
- "IOTransport::request() - unsupported operation" );
+Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
+ throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1393773&r1=1393772&r2=1393773&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Wed Oct 3 21:23:41 2012
@@ -33,8 +33,8 @@
#include <decaf/util/logging/LoggerDefines.h>
#include <memory>
-namespace activemq{
-namespace transport{
+namespace activemq {
+namespace transport {
using decaf::lang::Pointer;
using activemq::commands::Command;
@@ -55,7 +55,7 @@ namespace transport{
class AMQCPP_API IOTransport : public Transport,
public decaf::lang::Runnable {
- LOGDECAF_DECLARE( logger )
+ LOGDECAF_DECLARE(logger)
private:
@@ -91,8 +91,8 @@ namespace transport{
private:
- IOTransport( const IOTransport& );
- IOTransport& operator= ( const IOTransport& );
+ IOTransport(const IOTransport&);
+ IOTransport& operator=(const IOTransport&);
private:
@@ -100,13 +100,13 @@ namespace transport{
* Notify the exception listener
* @param ex the exception to send
*/
- void fire( decaf::lang::Exception& ex );
+ void fire(decaf::lang::Exception& ex);
/**
* Notify the command listener.
* @param command the command the send
*/
- void fire( const Pointer<Command>& command );
+ void fire(const Pointer<Command>& command);
public:
@@ -122,7 +122,7 @@ namespace transport{
* @param wireFormat
* Data encoder / decoder to use when reading and writing.
*/
- IOTransport( const Pointer<wireformat::WireFormat>& wireFormat );
+ IOTransport(const Pointer<wireformat::WireFormat>& wireFormat);
virtual ~IOTransport();
@@ -132,7 +132,7 @@ namespace transport{
* @param is
* The InputStream that will be read from by this object.
*/
- virtual void setInputStream( decaf::io::DataInputStream* is ) {
+ virtual void setInputStream(decaf::io::DataInputStream* is) {
this->inputStream = is;
}
@@ -142,37 +142,37 @@ namespace transport{
* @param os
* The OuputStream that will be written to by this object.
*/
- virtual void setOutputStream( decaf::io::DataOutputStream* os ) {
+ virtual void setOutputStream(decaf::io::DataOutputStream* os) {
this->outputStream = os;
}
- public: //Transport methods
+ public: // Transport methods
- virtual void oneway( const Pointer<Command>& command );
+ virtual void oneway(const Pointer<Command>& command);
/**
* {@inheritDoc}
*
* This method always thrown an UnsupportedOperationException.
*/
- virtual Pointer<Response> request( const Pointer<Command>& command );
+ virtual Pointer<Response> request(const Pointer<Command>& command);
/**
* {@inheritDoc}
*
* This method always thrown an UnsupportedOperationException.
*/
- virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+ virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
virtual Pointer<wireformat::WireFormat> getWireFormat() const {
- return this->wireFormat;
+ return this->wireFormat;
}
- virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ){
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat>& wireFormat) {
this->wireFormat = wireFormat;
}
- virtual void setTransportListener( TransportListener* listener ){
+ virtual void setTransportListener(TransportListener* listener) {
this->listener = listener;
}
@@ -186,8 +186,8 @@ namespace transport{
virtual void close();
- virtual Transport* narrow( const std::type_info& typeId ) {
- if( typeid( *this ) == typeId ) {
+ virtual Transport* narrow(const std::type_info& typeId) {
+ if (typeid(*this) == typeId) {
return this;
}
@@ -218,8 +218,7 @@ namespace transport{
return false;
}
- virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
- const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+ virtual void updateURIs(bool rebalance AMQCPP_UNUSED, const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED) {
throw decaf::io::IOException();
}
@@ -228,7 +227,7 @@ namespace transport{
*
* This method does nothing in this subclass.
*/
- virtual void reconnect( const decaf::net::URI& uri AMQCPP_UNUSED ) {}
+ virtual void reconnect(const decaf::net::URI& uri AMQCPP_UNUSED) {}
public: // Runnable methods.