You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/16 18:18:57 UTC

svn commit: r880857 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity: InactivityMonitor.cpp InactivityMonitor.h ReadChecker.cpp

Author: tabish
Date: Mon Nov 16 17:18:56 2009
New Revision: 880857

URL: http://svn.apache.org/viewvc?rev=880857&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-250

Complete but untested inactivity monitor.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Mon Nov 16 17:18:56 2009
@@ -20,17 +20,19 @@
 #include "ReadChecker.h"
 #include "WriteChecker.h"
 
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
 #include <activemq/commands/WireFormatInfo.h>
 #include <activemq/commands/KeepAliveInfo.h>
 
 #include <decaf/lang/Math.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/ThreadFactory.h>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::commands;
+using namespace activemq::threads;
 using namespace activemq::transport;
 using namespace activemq::transport::inactivity;
 using namespace activemq::exceptions;
@@ -48,81 +50,139 @@
 namespace transport{
 namespace inactivity{
 
-    class InactivityThreadFactory : public ThreadFactory {
+    class InactivityMonitorData {
     public:
 
-        virtual Thread* newThread( Runnable* runnable ) {
-            return new Thread( runnable, "Inactivity Monitor Async Task." );
-        }
+        // The configured WireFormat for the Transport Chain.
+        Pointer<WireFormat> wireFormat;
+
+        // Local and Remote WireFormat information.
+        Pointer<WireFormatInfo> localWireFormatInfo;
+        Pointer<WireFormatInfo> remoteWireFormatInfo;
+
+        Pointer<ReadChecker> readCheckerTask;
+        Pointer<WriteChecker> writeCheckerTask;
+
+        Timer readCheckTimer;
+        Timer writeCheckTimer;
+
+        Pointer<CompositeTaskRunner> asyncTasks;
+
+        Pointer<AsyncSignalReadErrorkTask> asyncReadTask;
+        Pointer<AsyncWriteTask> asyncWriteTask;
+
+        AtomicBoolean monitorStarted;
+
+        AtomicBoolean commandSent;
+        AtomicBoolean commandReceived;
+
+        AtomicBoolean failed;
+        AtomicBoolean inRead;
+        AtomicBoolean inWrite;
+
+        Mutex inWriteMutex;
+        Mutex monitor;
+
+        long long readCheckTime;
+        long long writeCheckTime;
+        long long initialDelayTime;
+
+        bool keepAliveResponseRequired;
     };
 
-    class AsyncException : decaf::lang::Runnable {
+    // Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
+    class AsyncSignalReadErrorkTask : public CompositeTask {
     private:
 
         InactivityMonitor* parent;
+        std::string remote;
+        AtomicBoolean failed;
 
     public:
 
-        AsyncException( InactivityMonitor* parent ) : parent( parent ) {
+        AsyncSignalReadErrorkTask( InactivityMonitor* parent, const std::string& remote ) {
+            this->parent = parent;
+            this->remote = remote;
         }
 
-        virtual void run() {
-            IOException ex (
-                __FILE__, __LINE__,
-                ( std::string( "Channel was inactive for too long: " ) + parent->next->getRemoteAddress() ).c_str() );
-            parent->onException( ex );
+        void setFailed( bool failed ) {
+            this->failed.set( failed );
         }
 
-    };
+        virtual bool isPending() const {
+            return this->failed.get();
+        }
 
-}}}
+        virtual bool iterate() {
 
-////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace transport{
-namespace inactivity{
+            if( this->failed.compareAndSet( true, false ) ) {
+
+                IOException ex (
+                    __FILE__, __LINE__,
+                    ( std::string( "Channel was inactive for too long: " ) + remote ).c_str() );
+
+                this->parent->onException( ex );
+            }
 
-    class AsyncWriteKeepAlive : decaf::lang::Runnable {
+            return this->failed.get();
+        }
+    };
+
+    // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
+    class AsyncWriteTask : public CompositeTask {
     private:
 
         InactivityMonitor* parent;
+        AtomicBoolean write;
 
     public:
 
-        AsyncWriteKeepAlive( InactivityMonitor* parent ) : parent( parent ) {
+        AsyncWriteTask( InactivityMonitor* parent ) : parent( parent ) {
         }
 
-        virtual void run() {
-            if( parent->monitorStarted.get() ) {
+        void setWrite( bool write ) {
+            this->write.set( write );
+        }
+
+        virtual bool isPending() const {
+            return this->write.get();
+        }
+
+        virtual bool iterate() {
+
+            if( this->write.compareAndSet( true, false ) &&
+                this->parent->members->monitorStarted.get() ) {
+
                 try {
                     Pointer<KeepAliveInfo> info( new KeepAliveInfo() );
-                    info->setResponseRequired( parent->keepAliveResponseRequired );
-                    parent->oneway( info );
-                } catch( IOException e ) {
-                    parent->onException( e );
+                    info->setResponseRequired( this->parent->members->keepAliveResponseRequired );
+                    this->parent->oneway( info );
+                } catch( IOException& e ) {
+                    this->parent->onException( e );
                 }
             }
-        }
 
+            return this->write.get();
+        }
     };
 
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
 InactivityMonitor::InactivityMonitor( const Pointer<Transport>& next, const Pointer<WireFormat>& wireFormat )
-:   TransportFilter( next ),
-    wireFormat( wireFormat ),
-    monitorStarted( false ),
-    commandSent( false ),
-    commandReceived( true),
-    failed( false ),
-    inRead( false ),
-    inWrite( false ),
-    readCheckTime( 0 ),
-    writeCheckTime( 0 ),
-    initialDelayTime( 0 ),
-    keepAliveResponseRequired( false ) {
+:   TransportFilter( next ), members( new InactivityMonitorData() ) {
 
+    this->members->wireFormat = wireFormat;
+    this->members->monitorStarted = false;
+    this->members->commandSent = false;
+    this->members->commandReceived = true;
+    this->members->failed = false;
+    this->members->inRead = false;
+    this->members->inWrite = false;
+    this->members->readCheckTime = 0;
+    this->members->writeCheckTime = 0;
+    this->members->initialDelayTime = 0;
+    this->members->keepAliveResponseRequired = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -134,6 +194,46 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getReadCheckTime() const {
+    return this->members->readCheckTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setReadCheckTime( long long value ) {
+    this->members->readCheckTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getWriteCheckTime() const {
+    return this->members->writeCheckTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setWriteCheckTime( long long value ) {
+    this->members->writeCheckTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long InactivityMonitor::getInitialDelayTime() const {
+    return this->members->initialDelayTime;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setInitialDelayTime( long long value ) const {
+    this->members->initialDelayTime = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool InactivityMonitor::isKeepAliveResponseRequired() const {
+    return this->members->keepAliveResponseRequired;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::setKeepAliveResponseRequired( bool value ) {
+    this->members->keepAliveResponseRequired = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::close() throw( decaf::io::IOException ) {
     try{
         stopMonitorThreads();
@@ -147,7 +247,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::onException( const decaf::lang::Exception& ex ) {
 
-    if( failed.compareAndSet( false, true ) ) {
+    if( this->members->failed.compareAndSet( false, true ) ) {
         stopMonitorThreads();
         TransportFilter::onException( ex );
     }
@@ -158,15 +258,15 @@
 
     TransportFilter::onCommand( command );
 
-    commandReceived.set( true );
-    inRead.set( true );
+    this->members->commandReceived.set( true );
+    this->members->inRead.set( true );
 
     try {
 
         if( command->isWireFormatInfo() ) {
-            synchronized( &monitor ) {
+            synchronized( &this->members->monitor ) {
 
-                remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+                this->members->remoteWireFormatInfo = command.dynamicCast<WireFormatInfo>();
                 try {
                     startMonitorThreads();
                 } catch( IOException& e ) {
@@ -177,10 +277,10 @@
 
         TransportFilter::onCommand( command );
 
-        inRead.set( false );
+        this->members->inRead.set( false );
 
     } catch( Exception& ex ) {
-        inRead.set( false );
+        this->members->inRead.set( false );
         ex.setMark( __FILE__, __LINE__ );
         throw ex;
     }
@@ -195,31 +295,31 @@
         // method - its not synchronized
         // further down the transport stack and gets called by more
         // than one thread  by this class
-        synchronized( &inWriteMutex ) {
-            this->inWrite.set( true );
+        synchronized( &this->members->inWriteMutex ) {
+            this->members->inWrite.set( true );
             try {
 
-                if( failed.get() ) {
+                if( this->members->failed.get() ) {
                     throw IOException(
                         __FILE__, __LINE__,
                         ( std::string( "Channel was inactive for too long: " ) + next->getRemoteAddress() ).c_str() );
                 }
 
                 if( command->isWireFormatInfo() ) {
-                    synchronized( &monitor ) {
-                        localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+                    synchronized( &this->members->monitor ) {
+                        this->members->localWireFormatInfo = command.dynamicCast<WireFormatInfo>();
                         startMonitorThreads();
                     }
                 }
 
                 this->next->oneway( command );
 
-                this->commandSent.set( true );
-                this->inWrite.set( false );
+                this->members->commandSent.set( true );
+                this->members->inWrite.set( false );
 
             } catch( Exception& ex ) {
-                this->commandSent.set( true );
-                this->inWrite.set( false );
+                this->members->commandSent.set( true );
+                this->members->inWrite.set( false );
 
                 ex.setMark( __FILE__, __LINE__ );
                 throw ex;
@@ -234,84 +334,94 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 bool InactivityMonitor::allowReadCheck( long long elapsed ) {
-    return elapsed > (readCheckTime * 9 / 10);
+    return elapsed > ( this->members->readCheckTime * 9 / 10 );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::readCheck() {
 
-    if( inRead.get() || wireFormat->inReceive() ) {
+    if( this->members->inRead.get() || this->members->wireFormat->inReceive() ) {
         return;
     }
 
-    if( !commandReceived.get() ) {
-//        ASYNC_TASKS.execute( new Runnable() {
-//            public void run() {
-//                onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
-//            };
-//
-//        });
+    std::cout << "Performing a Read Check" << std::endl;
+
+    if( !this->members->commandReceived.get() ) {
+
+        std::cout << "Read Check: No Read occurred this period, failed." << std::endl;
+
+        // Set the failed state on our async Read Failure Task and wakeup its runner.
+        this->members->asyncReadTask->setFailed( true );
+        this->members->asyncTasks->wakeup();
     }
 
-    commandReceived.set( false );
+    this->members->commandReceived.set( false );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::writeCheck() {
 
-    if( inWrite.get() ) {
+    if( this->members->inWrite.get() ) {
         return;
     }
 
-    if (!commandSent.get()) {
+    std::cout << "Performing Write Check" << std::endl;
+
+    if(! this->members->commandSent.get() ) {
+
+        std::cout << "Not Command Sent this period, writing" << std::endl;
 
-//        ASYNC_TASKS.execute( new Runnable() {
-//            public void run() {
-//                if (monitorStarted.get()) {
-//                    try {
-//
-//                        KeepAliveInfo info = new KeepAliveInfo();
-//                        info.setResponseRequired(keepAliveResponseRequired);
-//                        oneway(info);
-//                    } catch (IOException e) {
-//                        onException(e);
-//                    }
-//                }
-//            };
-//        });
+        this->members->asyncWriteTask->setWrite( true );
+        this->members->asyncTasks->wakeup();
     }
 
-    commandSent.set( false );
+    this->members->commandSent.set( false );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::startMonitorThreads() {
 
-    synchronized( &monitor ) {
+    synchronized( &this->members->monitor ) {
 
-        if( monitorStarted.get() ) {
+        std::cout << "Starting Monitor Threads:" << std::endl;
+
+        if( this->members->monitorStarted.get() ) {
             return;
         }
-        if( localWireFormatInfo == NULL ) {
+        if( this->members->localWireFormatInfo == NULL ) {
             return;
         }
-        if( remoteWireFormatInfo == NULL ) {
+        if( this->members->remoteWireFormatInfo == NULL ) {
             return;
         }
 
-        readCheckTime = Math::min( localWireFormatInfo->getMaxInactivityDuration(),
-                                   remoteWireFormatInfo->getMaxInactivityDuration() );
-        initialDelayTime = Math::min( localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
-                                      remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
-        if( readCheckTime > 0 ) {
-
-            monitorStarted.set( true );
-            writeCheckerTask.reset( new WriteChecker( this ) );
-            readCheckerTask.reset( new ReadChecker( this ) );
-            writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
-
-            this->writeCheckTimer.scheduleAtFixedRate( writeCheckerTask, initialDelayTime, writeCheckTime );
-            this->readCheckTimer.scheduleAtFixedRate( readCheckerTask, initialDelayTime, readCheckTime );
+        this->members->asyncTasks.reset( new CompositeTaskRunner() );
+        this->members->asyncReadTask.reset( new AsyncSignalReadErrorkTask( this, this->getRemoteAddress() ) );
+        this->members->asyncWriteTask.reset( new AsyncWriteTask( this ) );
+
+        this->members->asyncTasks->addTask( this->members->asyncReadTask.get() );
+        this->members->asyncTasks->addTask( this->members->asyncWriteTask.get() );
+
+        this->members->readCheckTime =
+            Math::min( this->members->localWireFormatInfo->getMaxInactivityDuration(),
+                        this->members->remoteWireFormatInfo->getMaxInactivityDuration() );
+
+        this->members->initialDelayTime =
+            Math::min( this->members->localWireFormatInfo->getMaxInactivityDurationInitalDelay(),
+                       this->members->remoteWireFormatInfo->getMaxInactivityDurationInitalDelay() );
+
+        if( this->members->readCheckTime > 0 ) {
+
+            this->members->monitorStarted.set( true );
+            this->members->writeCheckerTask.reset( new WriteChecker( this ) );
+            this->members->readCheckerTask.reset( new ReadChecker( this ) );
+            this->members->writeCheckTime = this->members->readCheckTime > 3 ?
+                                                this->members->readCheckTime / 3 : this->members->readCheckTime;
+
+            this->members->writeCheckTimer.scheduleAtFixedRate(
+                this->members->writeCheckerTask, this->members->initialDelayTime, this->members->writeCheckTime );
+            this->members->readCheckTimer.scheduleAtFixedRate(
+                this->members->readCheckerTask, this->members->initialDelayTime, this->members->readCheckTime );
         }
     }
 }
@@ -319,17 +429,21 @@
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::stopMonitorThreads() {
 
-    synchronized( &monitor ) {
+    synchronized( &this->members->monitor ) {
+
+        std::cout << "Stopping Monitor Threads:" << std::endl;
+
+        if( this->members->monitorStarted.compareAndSet( true, false ) ) {
 
-        if( monitorStarted.compareAndSet( true, false ) ) {
+            this->members->readCheckerTask->cancel();
+            this->members->writeCheckerTask->cancel();
 
-            readCheckerTask->cancel();
-            writeCheckerTask->cancel();
+            this->members->readCheckTimer.purge();
+            this->members->readCheckTimer.cancel();
+            this->members->writeCheckTimer.purge();
+            this->members->writeCheckTimer.cancel();
 
-            readCheckTimer.purge();
-            readCheckTimer.cancel();
-            writeCheckTimer.purge();
-            writeCheckTimer.cancel();
+            this->members->asyncTasks->shutdown();
         }
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h Mon Nov 16 17:18:56 2009
@@ -30,6 +30,8 @@
 #include <decaf/util/Timer.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 
+#include <memory>
+
 namespace activemq {
 namespace transport {
 namespace inactivity {
@@ -38,50 +40,20 @@
 
     class ReadChecker;
     class WriteChecker;
-    class AsyncException;
-    class AsyncWriteKeepAlive;
+    class AsyncSignalReadErrorkTask;
+    class AsyncWriteTask;
+    class InactivityMonitorData;
 
     class AMQCPP_API InactivityMonitor : public TransportFilter {
     private:
 
-        // The configured WireFormat for the Transport Chain.
-        Pointer<wireformat::WireFormat> wireFormat;
-
-        // Local and Remote WireFormat information.
-        Pointer<commands::WireFormatInfo> localWireFormatInfo;
-        Pointer<commands::WireFormatInfo> remoteWireFormatInfo;
-
-        Pointer<ReadChecker> readCheckerTask;
-        Pointer<WriteChecker> writeCheckerTask;
-
-        // TODO - We could optimize so that all instances of an Inactivity monitor share a single
-        //        static instance of the read and write timers.  Have to track the number of tasks
-        //        that are scheduled then so we know when to cancel and cleanup the timers.
-        decaf::util::Timer readCheckTimer;
-        decaf::util::Timer writeCheckTimer;
-
-        decaf::util::concurrent::atomic::AtomicBoolean monitorStarted;
-
-        decaf::util::concurrent::atomic::AtomicBoolean commandSent;
-        decaf::util::concurrent::atomic::AtomicBoolean commandReceived;
-
-        decaf::util::concurrent::atomic::AtomicBoolean failed;
-        decaf::util::concurrent::atomic::AtomicBoolean inRead;
-        decaf::util::concurrent::atomic::AtomicBoolean inWrite;
-
-        decaf::util::concurrent::Mutex inWriteMutex;
-        decaf::util::concurrent::Mutex monitor;
-
-        long long readCheckTime;
-        long long writeCheckTime;
-        long long initialDelayTime;
+        // Internal Class used to house the data structures for this object
+        std::auto_ptr<InactivityMonitorData> members;
 
         friend class ReadChecker;
+        friend class AsyncSignalReadErrorkTask;
         friend class WriteChecker;
-        friend class AsyncException;
-        friend class AsyncWriteKeepAlive;
-
-        bool keepAliveResponseRequired;
+        friend class AsyncWriteTask;
 
     public:
 
@@ -104,13 +76,21 @@
         virtual void oneway( const Pointer<Command>& command )
             throw( decaf::io::IOException, decaf::lang::exceptions::UnsupportedOperationException );
 
-        bool isKeepAliveResponseRequired() const {
-            return this->keepAliveResponseRequired;
-        }
-
-        void setKeepAliveResponseRequired( bool value ) {
-            this->keepAliveResponseRequired = value;
-        }
+        bool isKeepAliveResponseRequired() const;
+
+        void setKeepAliveResponseRequired( bool value );
+
+        long long getReadCheckTime() const;
+
+        void setReadCheckTime( long long value );
+
+        long long getWriteCheckTime() const;
+
+        void setWriteCheckTime( long long value );
+
+        long long getInitialDelayTime() const;
+
+        void setInitialDelayTime( long long value ) const;
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp?rev=880857&r1=880856&r2=880857&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp Mon Nov 16 17:18:56 2009
@@ -47,7 +47,7 @@
 void ReadChecker::run() {
 
     long long now = System::currentTimeMillis();
-    long long elapsed = ( now - lastRunTime );
+    long long elapsed = ( now - this->lastRunTime );
 
     // Perhaps the timer executed a read check late.. and then executes
     // the next read check on time which causes the time elapsed between
@@ -58,7 +58,7 @@
         return;
     }
 
-    lastRunTime = now;
+    this->lastRunTime = now;
 
     // Invoke the parent check routine.
     this->parent->readCheck();