You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/16 18:18:57 UTC
svn commit: r880857 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity:
InactivityMonitor.cpp InactivityMonitor.h ReadChecker.cpp
Author: tabish
Date: Mon Nov 16 17:18:56 2009
New Revision: 880857
URL: http://svn.apache.org/viewvc?rev=880857&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-250
Complete but untested inactivity monitor.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Mon Nov 16 17:18:56 2009
@@ -20,17 +20,19 @@
#include "ReadChecker.h"
#include "WriteChecker.h"
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
#include <activemq/commands/WireFormatInfo.h>
#include <activemq/commands/KeepAliveInfo.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/ThreadFactory.h>
using namespace std;
using namespace activemq;
using namespace activemq::commands;
+using namespace activemq::threads;
using namespace activemq::transport;
using namespace activemq::transport::inactivity;
using namespace activemq::exceptions;
@@ -48,81 +50,139 @@
namespace transport{
namespace inactivity{
- class InactivityThreadFactory : public ThreadFactory {
+ class InactivityMonitorData {
public:
- virtual Thread* newThread( Runnable* runnable ) {
- return new Thread( runnable, "Inactivity Monitor Async Task." );
- }
+ // The configured WireFormat for the Transport Chain.
+ Pointer<WireFormat> wireFormat;
+
+ // Local and Remote WireFormat information.
+ Pointer<WireFormatInfo> localWireFormatInfo;
+ Pointer<WireFormatInfo> remoteWireFormatInfo;
+
+ Pointer<ReadChecker> readCheckerTask;
+ Pointer<WriteChecker> writeCheckerTask;
+
+ Timer readCheckTimer;
+ Timer writeCheckTimer;
+
+ Pointer<CompositeTaskRunner> asyncTasks;
+
+ Pointer<AsyncSignalReadErrorkTask> asyncReadTask;
+ Pointer<AsyncWriteTask> asyncWriteTask;
+
+ AtomicBoolean monitorStarted;
+
+ AtomicBoolean commandSent;
+ AtomicBoolean commandReceived;
+
+ AtomicBoolean failed;
+ AtomicBoolean inRead;
+ AtomicBoolean inWrite;
+
+ Mutex inWriteMutex;
+ Mutex monitor;
+
+ long long readCheckTime;
+ long long writeCheckTime;
+ long long initialDelayTime;
+
+ bool keepAliveResponseRequired;
};
- class AsyncException : decaf::lang::Runnable {
+ // Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
+ class AsyncSignalReadErrorkTask : public CompositeTask {
private:
InactivityMonitor* parent;
+ std::string remote;
+ AtomicBoolean failed;
public:
- AsyncException( InactivityMonitor* parent ) : parent( parent ) {
+ AsyncSignalReadErrorkTask( InactivityMonitor* parent, const std::string& remote ) {
+ this->parent = parent;
+ this->remote = remote;
}
- virtual void run() {
- IOException ex (
- __FILE__, __LINE__,
- ( std::string( "Channel was inactive for too long: " ) + parent->next->getRemoteAddress() ).c_str() );
- parent->onException( ex );
+ void setFailed( bool failed ) {
+ this->failed.set( failed );
}
- };
+ virtual bool isPending() const {
+ return this->failed.get();
+ }
-}}}
+ virtual bool iterate() {
-////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace transport{
-namespace inactivity{
+ if( this->failed.compareAndSet( true, false ) ) {
+
+ IOException ex (
+ __FILE__, __LINE__,
+ ( std::string( "Channel was inactive for too long: " ) + remote ).c_str() );
+
+ this->parent->onException( ex );
+ }
- class AsyncWriteKeepAlive : decaf::lang::Runnable {
+ return this->failed.get();
+ }
+ };
+
+ // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+ class AsyncWriteTask : public CompositeTask {
private:
InactivityMonitor* parent;
+ AtomicBoolean write;
public:
- AsyncWriteKeepAlive( InactivityMonitor* parent ) : parent( parent ) {
+ AsyncWriteTask( InactivityMonitor* parent ) : parent( parent ) {
}
- virtual void run() {
- if( parent->monitorStarted.get() ) {
+ void setWrite( bool write ) {
+ this->write.set( write );
+ }
+
+ virtual bool isPending() const {
+ return this->write.get();
+ }
+
+ virtual bool iterate() {
+
+ if( this->write.compareAndSet( true, false ) &&
+ this->parent->members->monitorStarted.get() ) {
+
try {
Pointer<KeepAliveInfo> info( new KeepAliveInfo() );
- info->setResponseRequired( parent->keepAliveResponseRequired );
- parent->oneway( info );
- } catch( IOException e ) {
- parent->onException( e );
+ info->setResponseRequired( this->parent->members->keepAliveResponseRequired );
+ this->parent->oneway( info );
+ } catch( IOException& e ) {
+ this->parent->onException( e );
}
}
- }
+ return this->write.get();
+ }
};
}}}
////////////////////////////////////////////////////////////////////////////////
InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat )
-: TransportFilter( next ),
- wireFormat( wireFormat ),
- monitorStarted( false ),
- commandSent( false ),
- commandReceived( true),
- failed( false ),
- inRead( false ),
- inWrite( false ),
- readCheckTime( 0 ),
- writeCheckTime( 0 ),
- initialDelayTime( 0 ),
- keepAliveResponseRequired( false ) {
+: TransportFilter( next ), members( new InactivityMonitorData() ) {
+ this->members->wireFormat = wireFormat;
+ this->members->monitorStarted = false;
+ this->members->commandSent = false;
+ this->members->commandReceived = true;
+ this->members->failed = false;
+ this->members->inRead = false;
+ this->members->inWrite = false;
+ this->members->readCheckTime = 0;
+ this->members->writeCheckTime = 0;
+ this->members->initialDelayTime = 0;
+ this->members->keepAliveResponseRequired = false;
}
////////////////////////////////////////////////////////////////////////////////
@@ -134,6 +194,46 @@
}
////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getReadCheckTime() const {
+ return this->members->readCheckTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setReadCheckTime( long long value ) {
+ this->members->readCheckTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getWriteCheckTime() const {
+ return this->members->writeCheckTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setWriteCheckTime( long long value ) {
+ this->members->writeCheckTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getInitialDelayTime() const {
+ return this->members->initialDelayTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setInitialDelayTime( long long value ) const {
+ this->members->initialDelayTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool InactivityMonitor::isKeepAliveResponseRequired() const {
+ return this->members->keepAliveResponseRequired;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setKeepAliveResponseRequired( bool value ) {
+ this->members->keepAliveResponseRequired = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::close() throw( decaf::io::IOException ) {
try{
stopMonitorThreads();
@@ -147,7 +247,7 @@
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::onException( const decaf::lang::Exception& ex ) {
- if( failed.compareAndSet( false, true ) ) {
+ if( this->members->failed.compareAndSet( false, true ) ) {
stopMonitorThreads();
TransportFilter::onException( ex );
}
@@ -158,15 +258,15 @@
TransportFilter::onCommand( command );
- commandReceived.set( true );
- inRead.set( true );
+ this->members->commandReceived.set( true );
+ this->members->inRead.set( true );
try {
if( command->isWireFormatInfo() ) {
- synchronized( &monitor ) {
+ synchronized( &this->members->monitor ) {
- remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+ this->members->remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
try {
startMonitorThreads();
} catch( IOException& e ) {
@@ -177,10 +277,10 @@
TransportFilter::onCommand( command );
- inRead.set( false );
+ this->members->inRead.set( false );
} catch( Exception& ex ) {
- inRead.set( false );
+ this->members->inRead.set( false );
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
@@ -195,31 +295,31 @@
// method - its not synchronized
// further down the transport stack and gets called by more
// than one thread by this class
- synchronized( &inWriteMutex ) {
- this->inWrite.set( true );
+ synchronized( &this->members->inWriteMutex ) {
+ this->members->inWrite.set( true );
try {
- if( failed.get() ) {
+ if( this->members->failed.get() ) {
throw IOException(
__FILE__, __LINE__,
( std::string( "Channel was inactive for too long: " ) + next->getRemoteAddress() ).c_str() );
}
if( command->isWireFormatInfo() ) {
- synchronized( &monitor ) {
- localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+ synchronized( &this->members->monitor ) {
+ this->members->localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
startMonitorThreads();
}
}
this->next->oneway( command );
- this->commandSent.set( true );
- this->inWrite.set( false );
+ this->members->commandSent.set( true );
+ this->members->inWrite.set( false );
} catch( Exception& ex ) {
- this->commandSent.set( true );
- this->inWrite.set( false );
+ this->members->commandSent.set( true );
+ this->members->inWrite.set( false );
ex.setMark( __FILE__, __LINE__ );
throw ex;
@@ -234,84 +334,94 @@
////////////////////////////////////////////////////////////////////////////////
bool InactivityMonitor::allowReadCheck( long long elapsed ) {
- return elapsed > (readCheckTime * 9 / 10);
+ return elapsed > ( this->members->readCheckTime * 9 / 10 );
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::readCheck() {
- if( inRead.get() || wireFormat->inReceive() ) {
+ if( this->members->inRead.get() || this->members->wireFormat->inReceive() ) {
return;
}
- if( !commandReceived.get() ) {
-// ASYNC_TASKS.execute( new Runnable() {
-// public void run() {
-// onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
-// };
-//
-// });
+ std::cout << "Performing a Read Check" << std::endl;
+
+ if( !this->members->commandReceived.get() ) {
+
+ std::cout << "Read Check: No Read occurred this period, failed." << std::endl;
+
+ // Set the failed state on our async Read Failure Task and wakeup its runner.
+ this->members->asyncReadTask->setFailed( true );
+ this->members->asyncTasks->wakeup();
}
- commandReceived.set( false );
+ this->members->commandReceived.set( false );
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::writeCheck() {
- if( inWrite.get() ) {
+ if( this->members->inWrite.get() ) {
return;
}
- if (!commandSent.get()) {
+ std::cout << "Performing Write Check" << std::endl;
+
+ if(! this->members->commandSent.get() ) {
+
+ std::cout << "Not Command Sent this period, writing" << std::endl;
-// ASYNC_TASKS.execute( new Runnable() {
-// public void run() {
-// if (monitorStarted.get()) {
-// try {
-//
-// KeepAliveInfo info = new KeepAliveInfo();
-// info.setResponseRequired(keepAliveResponseRequired);
-// oneway(info);
-// } catch (IOException e) {
-// onException(e);
-// }
-// }
-// };
-// });
+ this->members->asyncWriteTask->setWrite( true );
+ this->members->asyncTasks->wakeup();
}
- commandSent.set( false );
+ this->members->commandSent.set( false );
}
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::startMonitorThreads() {
- synchronized( &monitor ) {
+ synchronized( &this->members->monitor ) {
- if( monitorStarted.get() ) {
+ std::cout << "Starting Monitor Threads:" << std::endl;
+
+ if( this->members->monitorStarted.get() ) {
return;
}
- if( localWireFormatInfo == NULL ) {
+ if( this->members->localWireFormatInfo == NULL ) {
return;
}
- if( remoteWireFormatInfo == NULL ) {
+ if( this->members->remoteWireFormatInfo == NULL ) {
return;
}
- readCheckTime = Math::min( localWireFormatInfo->getMaxInactivityDuration(),
- remoteWireFormatInfo->getMaxInactivityDuration() );
- initialDelayTime = Math::min( localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
- remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
- if( readCheckTime > 0 ) {
-
- monitorStarted.set( true );
- writeCheckerTask.reset( new WriteChecker( this ) );
- readCheckerTask.reset( new ReadChecker( this ) );
- writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
-
- this->writeCheckTimer.scheduleAtFixedRate( writeCheckerTask, initialDelayTime, writeCheckTime );
- this->readCheckTimer.scheduleAtFixedRate( readCheckerTask, initialDelayTime, readCheckTime );
+ this->members->asyncTasks.reset( new CompositeTaskRunner() );
+ this->members->asyncReadTask.reset( new AsyncSignalReadErrorkTask( this, this->getRemoteAddress() ) );
+ this->members->asyncWriteTask.reset( new AsyncWriteTask( this ) );
+
+ this->members->asyncTasks->addTask( this->members->asyncReadTask.get() );
+ this->members->asyncTasks->addTask( this->members->asyncWriteTask.get() );
+
+ this->members->readCheckTime =
+ Math::min( this->members->localWireFormatInfo->getMaxInactivityDuration(),
+ this->members->remoteWireFormatInfo->getMaxInactivityDuration() );
+
+ this->members->initialDelayTime =
+ Math::min( this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
+ this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
+
+ if( this->members->readCheckTime > 0 ) {
+
+ this->members->monitorStarted.set( true );
+ this->members->writeCheckerTask.reset( new WriteChecker( this ) );
+ this->members->readCheckerTask.reset( new ReadChecker( this ) );
+ this->members->writeCheckTime = this->members->readCheckTime > 3 ?
+ this->members->readCheckTime / 3 : this->members->readCheckTime;
+
+ this->members->writeCheckTimer.scheduleAtFixedRate(
+ this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime );
+ this->members->readCheckTimer.scheduleAtFixedRate(
+ this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime );
}
}
}
@@ -319,17 +429,21 @@
////////////////////////////////////////////////////////////////////////////////
void InactivityMonitor::stopMonitorThreads() {
- synchronized( &monitor ) {
+ synchronized( &this->members->monitor ) {
+
+ std::cout << "Stopping Monitor Threads:" << std::endl;
+
+ if( this->members->monitorStarted.compareAndSet( true, false ) ) {
- if( monitorStarted.compareAndSet( true, false ) ) {
+ this->members->readCheckerTask->cancel();
+ this->members->writeCheckerTask->cancel();
- readCheckerTask->cancel();
- writeCheckerTask->cancel();
+ this->members->readCheckTimer.purge();
+ this->members->readCheckTimer.cancel();
+ this->members->writeCheckTimer.purge();
+ this->members->writeCheckTimer.cancel();
- readCheckTimer.purge();
- readCheckTimer.cancel();
- writeCheckTimer.purge();
- writeCheckTimer.cancel();
+ this->members->asyncTasks->shutdown();
}
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h Mon Nov 16 17:18:56 2009
@@ -30,6 +30,8 @@
#include <decaf/util/Timer.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <memory>
+
namespace activemq {
namespace transport {
namespace inactivity {
@@ -38,50 +40,20 @@
class ReadChecker;
class WriteChecker;
- class AsyncException;
- class AsyncWriteKeepAlive;
+ class AsyncSignalReadErrorkTask;
+ class AsyncWriteTask;
+ class InactivityMonitorData;
class AMQCPP_API InactivityMonitor : public TransportFilter {
private:
- // The configured WireFormat for the Transport Chain.
- Pointer<wireformat::WireFormat> wireFormat;
-
- // Local and Remote WireFormat information.
- Pointer<commands::WireFormatInfo> localWireFormatInfo;
- Pointer<commands::WireFormatInfo> remoteWireFormatInfo;
-
- Pointer<ReadChecker> readCheckerTask;
- Pointer<WriteChecker> writeCheckerTask;
-
- // TODO - We could optimize so that all instances of an Inactivity monitor share a single
- // static instance of the read and write timers. Have to track the number of tasks
- // that are scheduled then so we know when to cancel and cleanup the timers.
- decaf::util::Timer readCheckTimer;
- decaf::util::Timer writeCheckTimer;
-
- decaf::util::concurrent::atomic::AtomicBoolean monitorStarted;
-
- decaf::util::concurrent::atomic::AtomicBoolean commandSent;
- decaf::util::concurrent::atomic::AtomicBoolean commandReceived;
-
- decaf::util::concurrent::atomic::AtomicBoolean failed;
- decaf::util::concurrent::atomic::AtomicBoolean inRead;
- decaf::util::concurrent::atomic::AtomicBoolean inWrite;
-
- decaf::util::concurrent::Mutex inWriteMutex;
- decaf::util::concurrent::Mutex monitor;
-
- long long readCheckTime;
- long long writeCheckTime;
- long long initialDelayTime;
+ // Internal Class used to house the data structures for this object
+ std::auto_ptr<InactivityMonitorData> members;
friend class ReadChecker;
+ friend class AsyncSignalReadErrorkTask;
friend class WriteChecker;
- friend class AsyncException;
- friend class AsyncWriteKeepAlive;
-
- bool keepAliveResponseRequired;
+ friend class AsyncWriteTask;
public:
@@ -104,13 +76,21 @@
virtual void oneway( const Pointer<Command>& command )
throw( decaf::io::IOException, decaf::lang::exceptions::UnsupportedOperationException );
- bool isKeepAliveResponseRequired() const {
- return this->keepAliveResponseRequired;
- }
-
- void setKeepAliveResponseRequired( bool value ) {
- this->keepAliveResponseRequired = value;
- }
+ bool isKeepAliveResponseRequired() const;
+
+ void setKeepAliveResponseRequired( bool value );
+
+ long long getReadCheckTime() const;
+
+ void setReadCheckTime( long long value );
+
+ long long getWriteCheckTime() const;
+
+ void setWriteCheckTime( long long value );
+
+ long long getInitialDelayTime() const;
+
+ void setInitialDelayTime( long long value ) const;
private:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp Mon Nov 16 17:18:56 2009
@@ -47,7 +47,7 @@
void ReadChecker::run() {
long long now = System::currentTimeMillis();
- long long elapsed = ( now - lastRunTime );
+ long long elapsed = ( now - this->lastRunTime );
// Perhaps the timer executed a read check late.. and then executes
// the next read check on time which causes the time elapsed between
@@ -58,7 +58,7 @@
return;
}
- lastRunTime = now;
+ this->lastRunTime = now;
// Invoke the parent check routine.
this->parent->readCheck();