You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/04 00:21:10 UTC

svn commit: r1393798 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp

Author: tabish
Date: Wed Oct  3 22:21:10 2012
New Revision: 1393798

URL: http://svn.apache.org/viewvc?rev=1393798&view=rev
Log:
Polish up some of the code a bit, better destructor impls.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=1393798&r1=1393797&r2=1393798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Wed Oct  3 22:21:10 2012
@@ -54,8 +54,8 @@ namespace inactivity{
     class InactivityMonitorData {
     private:
 
-        InactivityMonitorData( const InactivityMonitorData& );
-        InactivityMonitorData operator= ( const InactivityMonitorData& );
+        InactivityMonitorData(const InactivityMonitorData&);
+        InactivityMonitorData operator=(const InactivityMonitorData&);
 
     public:
 
@@ -130,17 +130,17 @@ namespace inactivity{
 
     private:
 
-        AsyncSignalReadErrorkTask( const AsyncSignalReadErrorkTask& );
-        AsyncSignalReadErrorkTask operator= ( const AsyncSignalReadErrorkTask& );
+        AsyncSignalReadErrorkTask(const AsyncSignalReadErrorkTask&);
+        AsyncSignalReadErrorkTask operator=(const AsyncSignalReadErrorkTask&);
 
     public:
 
-        AsyncSignalReadErrorkTask( InactivityMonitor* parent, const std::string& remote ) :
+        AsyncSignalReadErrorkTask(InactivityMonitor* parent, const std::string& remote) :
             parent(parent), remote(remote), failed() {
         }
 
-        void setFailed( bool failed ) {
-            this->failed.set( failed );
+        void setFailed(bool failed) {
+            this->failed.set(failed);
         }
 
         virtual bool isPending() const {
@@ -149,13 +149,10 @@ namespace inactivity{
 
         virtual bool iterate() {
 
-            if( this->failed.compareAndSet( true, false ) ) {
-
-                IOException ex (
-                    __FILE__, __LINE__,
-                    ( std::string( "Channel was inactive for too long: " ) + remote ).c_str() );
-
-                this->parent->onException( ex );
+            if (this->failed.compareAndSet(true, false)) {
+                IOException ex(__FILE__, __LINE__,
+                    (std::string("Channel was inactive for too long: ") + remote).c_str());
+                this->parent->onException(ex);
             }
 
             return this->failed.get();
@@ -171,13 +168,12 @@ namespace inactivity{
 
     private:
 
-        AsyncWriteTask( const AsyncWriteTask& );
-        AsyncWriteTask operator= ( const AsyncWriteTask& );
+        AsyncWriteTask(const AsyncWriteTask&);
+        AsyncWriteTask operator=(const AsyncWriteTask&);
 
     public:
 
-        AsyncWriteTask( InactivityMonitor* parent ) : parent( parent ), write() {
-        }
+        AsyncWriteTask(InactivityMonitor* parent) : parent(parent), write() {}
 
         void setWrite( bool write ) {
             this->write.set( write );
@@ -189,15 +185,13 @@ namespace inactivity{
 
         virtual bool iterate() {
 
-            if( this->write.compareAndSet( true, false ) &&
-                this->parent->members->monitorStarted.get() ) {
-
+            if (this->write.compareAndSet(true, false) && this->parent->members->monitorStarted.get()) {
                 try {
-                    Pointer<KeepAliveInfo> info( new KeepAliveInfo() );
-                    info->setResponseRequired( this->parent->members->keepAliveResponseRequired );
-                    this->parent->oneway( info );
-                } catch( IOException& e ) {
-                    this->parent->onException( e );
+                    Pointer<KeepAliveInfo> info(new KeepAliveInfo());
+                    info->setResponseRequired(this->parent->members->keepAliveResponseRequired);
+                    this->parent->oneway(info);
+                } catch (IOException& e) {
+                    this->parent->onException(e);
                 }
             }
 
@@ -208,8 +202,8 @@ namespace inactivity{
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
-InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat )
-:   TransportFilter( next ), members( new InactivityMonitorData() ) {
+InactivityMonitor::InactivityMonitor(const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat) :
+    TransportFilter(next), members(new InactivityMonitorData()) {
 
     this->members->wireFormat = wireFormat;
     this->members->monitorStarted.set(false);
@@ -225,10 +219,8 @@ InactivityMonitor::InactivityMonitor( co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next,
-                                      const decaf::util::Properties& properties,
-                                      const Pointer<wireformat::WireFormat>& wireFormat )
-:   TransportFilter( next ), members( new InactivityMonitorData() ) {
+InactivityMonitor::InactivityMonitor(const Pointer<Transport>& next, const decaf::util::Properties& properties, const Pointer<wireformat::WireFormat>& wireFormat) :
+    TransportFilter(next), members(new InactivityMonitorData()) {
 
     this->members->wireFormat = wireFormat;
     this->members->monitorStarted.set(false);
@@ -240,14 +232,17 @@ InactivityMonitor::InactivityMonitor( co
     this->members->readCheckTime = 0;
     this->members->writeCheckTime = 0;
     this->members->initialDelayTime = 0;
-    this->members->keepAliveResponseRequired =
-        Boolean::parseBoolean( properties.getProperty( "keepAliveResponseRequired", "false" ) );
+    this->members->keepAliveResponseRequired = Boolean::parseBoolean(properties.getProperty("keepAliveResponseRequired", "false"));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 InactivityMonitor::~InactivityMonitor() {
-    try{
+    try {
         this->stopMonitorThreads();
+    }
+    AMQ_CATCHALL_NOTHROW()
+
+    try {
         delete this->members;
     }
     AMQ_CATCHALL_NOTHROW()
@@ -259,7 +254,7 @@ long long InactivityMonitor::getReadChec
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setReadCheckTime( long long value ) {
+void InactivityMonitor::setReadCheckTime(long long value) {
     this->members->readCheckTime = value;
 }
 
@@ -269,7 +264,7 @@ long long InactivityMonitor::getWriteChe
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setWriteCheckTime( long long value ) {
+void InactivityMonitor::setWriteCheckTime(long long value) {
     this->members->writeCheckTime = value;
 }
 
@@ -279,7 +274,7 @@ long long InactivityMonitor::getInitialD
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setInitialDelayTime( long long value ) const {
+void InactivityMonitor::setInitialDelayTime(long long value) const {
     this->members->initialDelayTime = value;
 }
 
@@ -289,142 +284,140 @@ bool InactivityMonitor::isKeepAliveRespo
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setKeepAliveResponseRequired( bool value ) {
+void InactivityMonitor::setKeepAliveResponseRequired(bool value) {
     this->members->keepAliveResponseRequired = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::close() {
-    try{
+    try {
         stopMonitorThreads();
         TransportFilter::close();
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::onException( const decaf::lang::Exception& ex ) {
+void InactivityMonitor::onException(const decaf::lang::Exception& ex) {
 
-    if( this->members->failed.compareAndSet( false, true ) ) {
+    if (this->members->failed.compareAndSet(false, true)) {
         stopMonitorThreads();
-        TransportFilter::onException( ex );
+        TransportFilter::onException(ex);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::onCommand( const Pointer<Command>& command ) {
+void InactivityMonitor::onCommand(const Pointer<Command>& command) {
 
-    this->members->commandReceived.set( true );
-    this->members->inRead.set( true );
+    this->members->commandReceived.set(true);
+    this->members->inRead.set(true);
 
     try {
 
-        if( command->isWireFormatInfo() ) {
-            synchronized( &this->members->monitor ) {
+        if (command->isWireFormatInfo()) {
+            synchronized(&this->members->monitor) {
 
                 this->members->remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
                 try {
                     startMonitorThreads();
-                } catch( IOException& e ) {
-                    onException( e );
+                } catch (IOException& e) {
+                    onException(e);
                 }
             }
         }
 
-        TransportFilter::onCommand( command );
+        TransportFilter::onCommand(command);
 
-        this->members->inRead.set( false );
+        this->members->inRead.set(false);
 
-    } catch( Exception& ex ) {
-        this->members->inRead.set( false );
-        ex.setMark( __FILE__, __LINE__ );
+    } catch (Exception& ex) {
+        this->members->inRead.set(false);
+        ex.setMark(__FILE__, __LINE__);
         throw ex;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::oneway( const Pointer<Command>& command ) {
+void InactivityMonitor::oneway(const Pointer<Command>& command) {
 
-    try{
-        // Disable inactivity monitoring while processing a command.  Synchronize this
-        // method - its not synchronized
-        // further down the transport stack and gets called by more
-        // than one thread  by this class
-        synchronized( &this->members->inWriteMutex ) {
-            this->members->inWrite.set( true );
+    try {
+        // Disable inactivity monitoring while processing a command. Synchronize this
+        // method - its not synchronized further down the transport stack and gets called
+        // by more than one thread  by this class
+        synchronized(&this->members->inWriteMutex) {
+            this->members->inWrite.set(true);
             try {
 
-                if( this->members->failed.get() ) {
-                    throw IOException(
-                        __FILE__, __LINE__,
-                        ( std::string( "Channel was inactive for too long: " ) + next->getRemoteAddress() ).c_str() );
+                if (this->members->failed.get()) {
+                    throw IOException(__FILE__, __LINE__,
+                        (std::string("Channel was inactive for too long: ") + next->getRemoteAddress()).c_str());
                 }
 
-                if( command->isWireFormatInfo() ) {
+                if (command->isWireFormatInfo()) {
                     synchronized( &this->members->monitor ) {
                         this->members->localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
                         startMonitorThreads();
                     }
                 }
 
-                this->next->oneway( command );
+                this->next->oneway(command);
 
-                this->members->commandSent.set( true );
-                this->members->inWrite.set( false );
+                this->members->commandSent.set(true);
+                this->members->inWrite.set(false);
 
-            } catch( Exception& ex ) {
-                this->members->commandSent.set( true );
-                this->members->inWrite.set( false );
+            } catch (Exception& ex) {
+                this->members->commandSent.set(true);
+                this->members->inWrite.set(false);
 
-                ex.setMark( __FILE__, __LINE__ );
+                ex.setMark(__FILE__, __LINE__);
                 throw ex;
             }
         }
     }
-    AMQ_CATCH_RETHROW( IOException )
-    AMQ_CATCH_RETHROW( UnsupportedOperationException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
-    AMQ_CATCHALL_THROW( IOException )
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_RETHROW(UnsupportedOperationException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool InactivityMonitor::allowReadCheck( long long elapsed ) {
-    return elapsed > ( this->members->readCheckTime * 9 / 10 );
+bool InactivityMonitor::allowReadCheck(long long elapsed) {
+    return elapsed > (this->members->readCheckTime * 9 / 10);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::readCheck() {
 
-    if( this->members->inRead.get() || this->members->wireFormat->inReceive() ) {
+    if (this->members->inRead.get() || this->members->wireFormat->inReceive()) {
         return;
     }
 
-    if( !this->members->commandReceived.get() ) {
+    if (!this->members->commandReceived.get()) {
 
         // Set the failed state on our async Read Failure Task and wakeup its runner.
-        this->members->asyncReadTask->setFailed( true );
+        this->members->asyncReadTask->setFailed(true);
         this->members->asyncTasks->wakeup();
     }
 
-    this->members->commandReceived.set( false );
+    this->members->commandReceived.set(false);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::writeCheck() {
 
-    if( this->members->inWrite.get() ) {
+    if (this->members->inWrite.get()) {
         return;
     }
 
-    if( !this->members->commandSent.get() ) {
+    if (!this->members->commandSent.get()) {
 
-        this->members->asyncWriteTask->setWrite( true );
+        this->members->asyncWriteTask->setWrite(true);
         this->members->asyncTasks->wakeup();
     }
 
-    this->members->commandSent.set( false );
+    this->members->commandSent.set(false);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -432,43 +425,38 @@ void InactivityMonitor::startMonitorThre
 
     synchronized( &this->members->monitor ) {
 
-        if( this->members->monitorStarted.get() ) {
+        if (this->members->monitorStarted.get()) {
             return;
         }
-        if( this->members->localWireFormatInfo == NULL ) {
+        if (this->members->localWireFormatInfo == NULL) {
             return;
         }
-        if( this->members->remoteWireFormatInfo == NULL ) {
+        if (this->members->remoteWireFormatInfo == NULL) {
             return;
         }
 
-        this->members->asyncTasks.reset( new CompositeTaskRunner() );
-        this->members->asyncReadTask.reset( new AsyncSignalReadErrorkTask( this, this->getRemoteAddress() ) );
-        this->members->asyncWriteTask.reset( new AsyncWriteTask( this ) );
-
-        this->members->asyncTasks->addTask( this->members->asyncReadTask.get() );
-        this->members->asyncTasks->addTask( this->members->asyncWriteTask.get() );
-
-        this->members->readCheckTime =
-            Math::min( this->members->localWireFormatInfo->getMaxInactivityDuration(),
-                        this->members->remoteWireFormatInfo->getMaxInactivityDuration() );
-
-        this->members->initialDelayTime =
-            Math::min( this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
-                       this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
-
-        if( this->members->readCheckTime > 0 ) {
-
-            this->members->monitorStarted.set( true );
-            this->members->writeCheckerTask.reset( new WriteChecker( this ) );
-            this->members->readCheckerTask.reset( new ReadChecker( this ) );
-            this->members->writeCheckTime = this->members->readCheckTime > 3 ?
-                                                this->members->readCheckTime / 3 : this->members->readCheckTime;
-
-            this->members->writeCheckTimer.scheduleAtFixedRate(
-                this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime );
-            this->members->readCheckTimer.scheduleAtFixedRate(
-                this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime );
+        this->members->asyncTasks.reset(new CompositeTaskRunner());
+        this->members->asyncReadTask.reset(new AsyncSignalReadErrorkTask(this, this->getRemoteAddress()));
+        this->members->asyncWriteTask.reset(new AsyncWriteTask(this));
+
+        this->members->asyncTasks->addTask(this->members->asyncReadTask.get());
+        this->members->asyncTasks->addTask(this->members->asyncWriteTask.get());
+
+        this->members->readCheckTime = Math::min(this->members->localWireFormatInfo->getMaxInactivityDuration(),
+                this->members->remoteWireFormatInfo->getMaxInactivityDuration());
+
+        this->members->initialDelayTime = Math::min(this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
+                this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay());
+
+        if (this->members->readCheckTime > 0) {
+
+            this->members->monitorStarted.set(true);
+            this->members->writeCheckerTask.reset(new WriteChecker(this));
+            this->members->readCheckerTask.reset(new ReadChecker(this));
+            this->members->writeCheckTime = this->members->readCheckTime > 3 ? this->members->readCheckTime / 3 : this->members->readCheckTime;
+
+            this->members->writeCheckTimer.scheduleAtFixedRate(this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime);
+            this->members->readCheckTimer.scheduleAtFixedRate(this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime);
         }
     }
 }
@@ -476,9 +464,9 @@ void InactivityMonitor::startMonitorThre
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::stopMonitorThreads() {
 
-    synchronized( &this->members->monitor ) {
+    synchronized(&this->members->monitor) {
 
-        if( this->members->monitorStarted.compareAndSet( true, false ) ) {
+        if (this->members->monitorStarted.compareAndSet(true, false)) {
 
             this->members->readCheckerTask->cancel();
             this->members->writeCheckerTask->cancel();