You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/04 00:21:10 UTC
svn commit: r1393798 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
Author: tabish
Date: Wed Oct 3 22:21:10 2012
New Revision: 1393798
URL: http://svn.apache.org/viewvc?rev=1393798&view=rev
Log:
Polish up some of the code a bit, better destructor impls.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=1393798&r1=1393797&r2=1393798&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Wed Oct 3 22:21:10 2012
@@ -54,8 +54,8 @@ namespace inactivity{
class InactivityMonitorData {
private:
- InactivityMonitorData( const InactivityMonitorData& );
- InactivityMonitorData operator= ( const InactivityMonitorData& );
+ InactivityMonitorData(const InactivityMonitorData&);
+ InactivityMonitorData operator=(const InactivityMonitorData&);
public:
@@ -130,17 +130,17 @@ namespace inactivity{
private:
- AsyncSignalReadErrorkTask( const AsyncSignalReadErrorkTask& );
- AsyncSignalReadErrorkTask operator= ( const AsyncSignalReadErrorkTask& );
+ AsyncSignalReadErrorkTask(const AsyncSignalReadErrorkTask&);
+ AsyncSignalReadErrorkTask operator=(const AsyncSignalReadErrorkTask&);
public:
- AsyncSignalReadErrorkTask( InactivityMonitor* parent, const std::string& remote ) :
+ AsyncSignalReadErrorkTask(InactivityMonitor* parent, const std::string& remote) :
parent(parent), remote(remote), failed() {
}
- void setFailed( bool failed ) {
- this->failed.set( failed );
+ void setFailed(bool failed) {
+ this->failed.set(failed);
}
virtual bool isPending() const {
@@ -149,13 +149,10 @@ namespace inactivity{
virtual bool iterate() {
- if( this->failed.compareAndSet( true, false ) ) {
-
- IOException ex (
- __FILE__, __LINE__,
- ( std::string( "Channel was inactive for too long: " ) + remote ).c_str() );
-
- this->parent->onException( ex );
+ if (this->failed.compareAndSet(true, false)) {
+ IOException ex(__FILE__, __LINE__,
+ (std::string("Channel was inactive for too long: ") + remote).c_str());
+ this->parent->onException(ex);
}
return this->failed.get();
@@ -171,13 +168,12 @@ namespace inactivity{
private:
- AsyncWriteTask( const AsyncWriteTask& );
- AsyncWriteTask operator= ( const AsyncWriteTask& );
+ AsyncWriteTask(const AsyncWriteTask&);
+ AsyncWriteTask operator=(const AsyncWriteTask&);
public:
- AsyncWriteTask( InactivityMonitor* parent ) : parent( parent ), write() {
- }
+ AsyncWriteTask(InactivityMonitor* parent) : parent(parent), write() {}
void setWrite( bool write ) {
this->write.set( write );
@@ -189,15 +185,13 @@ namespace inactivity{
virtual bool iterate() {
- if( this->write.compareAndSet( true, false ) &&
- this->parent->members->monitorStarted.get() ) {
-
+ if (this->write.compareAndSet(true, false) && this->parent->members->monitorStarted.get()) {
try {
- Pointer<KeepAliveInfo> info( new KeepAliveInfo() );
- info->setResponseRequired( this->parent->members->keepAliveResponseRequired );
- this->parent->oneway( info );
- } catch( IOException& e ) {
- this->parent->onException( e );
+ Pointer<KeepAliveInfo> info(new KeepAliveInfo());
+ info->setResponseRequired(this->parent->members->keepAliveResponseRequired);
+ this->parent->oneway(info);
+ } catch (IOException& e) {
+ this->parent->onException(e);
}
}
@@ -208,8 +202,8 @@ namespace inactivity{
}}}
////////////////////////////////////////////////////////////////////////////////
-InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat )
-: TransportFilter( next ), members( new InactivityMonitorData() ) {
+InactivityMonitor::InactivityMonitor(const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat) :
+ TransportFilter(next), members(new InactivityMonitorData()) {
this->members->wireFormat = wireFormat;
this->members->monitorStarted.set(false);
@@ -225,10 +219,8 @@ InactivityMonitor::InactivityMonitor( co
}
////////////////////////////////////////////////////////////////////////////////
-InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next,
- const decaf::util::Properties& properties,
- const Pointer<wireformat::WireFormat>& wireFormat )
-: TransportFilter( next ), members( new InactivityMonitorData() ) {
+InactivityMonitor::InactivityMonitor(const Pointer<Transport>& next, const decaf::util::Properties& properties, const Pointer<wireformat::WireFormat>& wireFormat) :
+ TransportFilter(next), members(new InactivityMonitorData()) {
this->members->wireFormat = wireFormat;
this->members->monitorStarted.set(false);
@@ -240,14 +232,17 @@ InactivityMonitor::InactivityMonitor( co
this->members->readCheckTime = 0;
this->members->writeCheckTime = 0;
this->members->initialDelayTime = 0;
- this->members->keepAliveResponseRequired =
- Boolean::parseBoolean( properties.getProperty( "keepAliveResponseRequired", "false" ) );
+ this->members->keepAliveResponseRequired = Boolean::parseBoolean(properties.getProperty("keepAliveResponseRequired", "false"));
}
////////////////////////////////////////////////////////////////////////////////
InactivityMonitor::~InactivityMonitor() {
- try{
+ try {
this->stopMonitorThreads();
+ }
+ AMQ_CATCHALL_NOTHROW()
+
+ try {
delete this->members;
}
AMQ_CATCHALL_NOTHROW()
@@ -259,7 +254,7 @@ long long InactivityMonitor::getReadChec
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setReadCheckTime( long long value ) {
+void InactivityMonitor::setReadCheckTime(long long value) {
this->members->readCheckTime = value;
}
@@ -269,7 +264,7 @@ long long InactivityMonitor::getWriteChe
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setWriteCheckTime( long long value ) {
+void InactivityMonitor::setWriteCheckTime(long long value) {
this->members->writeCheckTime = value;
}
@@ -279,7 +274,7 @@ long long InactivityMonitor::getInitialD
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setInitialDelayTime( long long value ) const {
+void InactivityMonitor::setInitialDelayTime(long long value) const {
this->members->initialDelayTime = value;
}
@@ -289,142 +284,140 @@ bool InactivityMonitor::isKeepAliveRespo
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::setKeepAliveResponseRequired( bool value ) {
+void InactivityMonitor::setKeepAliveResponseRequired(bool value) {
this->members->keepAliveResponseRequired = value;
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::close() {
- try{
+ try {
stopMonitorThreads();
TransportFilter::close();
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::onException( const decaf::lang::Exception& ex ) {
+void InactivityMonitor::onException(const decaf::lang::Exception& ex) {
- if( this->members->failed.compareAndSet( false, true ) ) {
+ if (this->members->failed.compareAndSet(false, true)) {
stopMonitorThreads();
- TransportFilter::onException( ex );
+ TransportFilter::onException(ex);
}
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::onCommand( const Pointer<Command>& command ) {
+void InactivityMonitor::onCommand(const Pointer<Command>& command) {
- this->members->commandReceived.set( true );
- this->members->inRead.set( true );
+ this->members->commandReceived.set(true);
+ this->members->inRead.set(true);
try {
- if( command->isWireFormatInfo() ) {
- synchronized( &this->members->monitor ) {
+ if (command->isWireFormatInfo()) {
+ synchronized(&this->members->monitor) {
this->members->remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
try {
startMonitorThreads();
- } catch( IOException& e ) {
- onException( e );
+ } catch (IOException& e) {
+ onException(e);
}
}
}
- TransportFilter::onCommand( command );
+ TransportFilter::onCommand(command);
- this->members->inRead.set( false );
+ this->members->inRead.set(false);
- } catch( Exception& ex ) {
- this->members->inRead.set( false );
- ex.setMark( __FILE__, __LINE__ );
+ } catch (Exception& ex) {
+ this->members->inRead.set(false);
+ ex.setMark(__FILE__, __LINE__);
throw ex;
}
}
////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::oneway( const Pointer<Command>& command ) {
+void InactivityMonitor::oneway(const Pointer<Command>& command) {
- try{
- // Disable inactivity monitoring while processing a command. Synchronize this
- // method - its not synchronized
- // further down the transport stack and gets called by more
- // than one thread by this class
- synchronized( &this->members->inWriteMutex ) {
- this->members->inWrite.set( true );
+ try {
+ // Disable inactivity monitoring while processing a command. Synchronize this
+ // method - its not synchronized further down the transport stack and gets called
+ // by more than one thread by this class
+ synchronized(&this->members->inWriteMutex) {
+ this->members->inWrite.set(true);
try {
- if( this->members->failed.get() ) {
- throw IOException(
- __FILE__, __LINE__,
- ( std::string( "Channel was inactive for too long: " ) + next->getRemoteAddress() ).c_str() );
+ if (this->members->failed.get()) {
+ throw IOException(__FILE__, __LINE__,
+ (std::string("Channel was inactive for too long: ") + next->getRemoteAddress()).c_str());
}
- if( command->isWireFormatInfo() ) {
+ if (command->isWireFormatInfo()) {
synchronized( &this->members->monitor ) {
this->members->localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
startMonitorThreads();
}
}
- this->next->oneway( command );
+ this->next->oneway(command);
- this->members->commandSent.set( true );
- this->members->inWrite.set( false );
+ this->members->commandSent.set(true);
+ this->members->inWrite.set(false);
- } catch( Exception& ex ) {
- this->members->commandSent.set( true );
- this->members->inWrite.set( false );
+ } catch (Exception& ex) {
+ this->members->commandSent.set(true);
+ this->members->inWrite.set(false);
- ex.setMark( __FILE__, __LINE__ );
+ ex.setMark(__FILE__, __LINE__);
throw ex;
}
}
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_RETHROW( UnsupportedOperationException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_RETHROW(UnsupportedOperationException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-bool InactivityMonitor::allowReadCheck( long long elapsed ) {
- return elapsed > ( this->members->readCheckTime * 9 / 10 );
+bool InactivityMonitor::allowReadCheck(long long elapsed) {
+ return elapsed > (this->members->readCheckTime * 9 / 10);
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::readCheck() {
- if( this->members->inRead.get() || this->members->wireFormat->inReceive() ) {
+ if (this->members->inRead.get() || this->members->wireFormat->inReceive()) {
return;
}
- if( !this->members->commandReceived.get() ) {
+ if (!this->members->commandReceived.get()) {
// Set the failed state on our async Read Failure Task and wakeup its runner.
- this->members->asyncReadTask->setFailed( true );
+ this->members->asyncReadTask->setFailed(true);
this->members->asyncTasks->wakeup();
}
- this->members->commandReceived.set( false );
+ this->members->commandReceived.set(false);
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::writeCheck() {
- if( this->members->inWrite.get() ) {
+ if (this->members->inWrite.get()) {
return;
}
- if( !this->members->commandSent.get() ) {
+ if (!this->members->commandSent.get()) {
- this->members->asyncWriteTask->setWrite( true );
+ this->members->asyncWriteTask->setWrite(true);
this->members->asyncTasks->wakeup();
}
- this->members->commandSent.set( false );
+ this->members->commandSent.set(false);
}
////////////////////////////////////////////////////////////////////////////////
@@ -432,43 +425,38 @@ void InactivityMonitor::startMonitorThre
synchronized( &this->members->monitor ) {
- if( this->members->monitorStarted.get() ) {
+ if (this->members->monitorStarted.get()) {
return;
}
- if( this->members->localWireFormatInfo == NULL ) {
+ if (this->members->localWireFormatInfo == NULL) {
return;
}
- if( this->members->remoteWireFormatInfo == NULL ) {
+ if (this->members->remoteWireFormatInfo == NULL) {
return;
}
- this->members->asyncTasks.reset( new CompositeTaskRunner() );
- this->members->asyncReadTask.reset( new AsyncSignalReadErrorkTask( this, this->getRemoteAddress() ) );
- this->members->asyncWriteTask.reset( new AsyncWriteTask( this ) );
-
- this->members->asyncTasks->addTask( this->members->asyncReadTask.get() );
- this->members->asyncTasks->addTask( this->members->asyncWriteTask.get() );
-
- this->members->readCheckTime =
- Math::min( this->members->localWireFormatInfo->getMaxInactivityDuration(),
- this->members->remoteWireFormatInfo->getMaxInactivityDuration() );
-
- this->members->initialDelayTime =
- Math::min( this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
- this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
-
- if( this->members->readCheckTime > 0 ) {
-
- this->members->monitorStarted.set( true );
- this->members->writeCheckerTask.reset( new WriteChecker( this ) );
- this->members->readCheckerTask.reset( new ReadChecker( this ) );
- this->members->writeCheckTime = this->members->readCheckTime > 3 ?
- this->members->readCheckTime / 3 : this->members->readCheckTime;
-
- this->members->writeCheckTimer.scheduleAtFixedRate(
- this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime );
- this->members->readCheckTimer.scheduleAtFixedRate(
- this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime );
+ this->members->asyncTasks.reset(new CompositeTaskRunner());
+ this->members->asyncReadTask.reset(new AsyncSignalReadErrorkTask(this, this->getRemoteAddress()));
+ this->members->asyncWriteTask.reset(new AsyncWriteTask(this));
+
+ this->members->asyncTasks->addTask(this->members->asyncReadTask.get());
+ this->members->asyncTasks->addTask(this->members->asyncWriteTask.get());
+
+ this->members->readCheckTime = Math::min(this->members->localWireFormatInfo->getMaxInactivityDuration(),
+ this->members->remoteWireFormatInfo->getMaxInactivityDuration());
+
+ this->members->initialDelayTime = Math::min(this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
+ this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay());
+
+ if (this->members->readCheckTime > 0) {
+
+ this->members->monitorStarted.set(true);
+ this->members->writeCheckerTask.reset(new WriteChecker(this));
+ this->members->readCheckerTask.reset(new ReadChecker(this));
+ this->members->writeCheckTime = this->members->readCheckTime > 3 ? this->members->readCheckTime / 3 : this->members->readCheckTime;
+
+ this->members->writeCheckTimer.scheduleAtFixedRate(this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime);
+ this->members->readCheckTimer.scheduleAtFixedRate(this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime);
}
}
}
@@ -476,9 +464,9 @@ void InactivityMonitor::startMonitorThre
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::stopMonitorThreads() {
- synchronized( &this->members->monitor ) {
+ synchronized(&this->members->monitor) {
- if( this->members->monitorStarted.compareAndSet( true, false ) ) {
+ if (this->members->monitorStarted.compareAndSet(true, false)) {
this->members->readCheckerTask->cancel();
this->members->writeCheckerTask->cancel();