You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/24 16:23:59 UTC
svn commit: r757843 - in /activemq/activemq-cpp/trunk/src: main/
main/activemq/threads/ main/activemq/transport/failover/
main/activemq/util/ test/ test/activemq/threads/ test/activemq/util/
Author: tabish
Date: Tue Mar 24 15:23:52 2009
New Revision: 757843
URL: http://svn.apache.org/viewvc?rev=757843&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100
Refactored TaskRunner classes to add a composite task runner and a dedicated task runner for use failover transport. The dedicated task runner will be used to close transports out of band to avoid deadlock when a transport is close as a result of a call to the onException method from the inner thread of the IOTransport.
Added:
activemq/activemq-cpp/trunk/src/main/activemq/threads/
activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h (with props)
activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h (with props)
activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h (with props)
activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h
- copied, changed from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h
activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h
- copied, changed from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h
activemq/activemq-cpp/trunk/src/test/activemq/threads/
activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp (with props)
activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h (with props)
activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp
- copied, changed from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h
- copied, changed from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h
Removed:
activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h
activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.cpp
activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h
activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h
Modified:
activemq/activemq-cpp/trunk/src/main/Makefile.am
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
activemq/activemq-cpp/trunk/src/test/Makefile.am
activemq/activemq-cpp/trunk/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Tue Mar 24 15:23:52 2009
@@ -314,7 +314,6 @@
activemq/util/PrimitiveValueNode.cpp \
activemq/util/CompositeData.cpp \
activemq/util/MemoryUsage.cpp \
- activemq/util/TaskRunner.cpp \
activemq/util/LongSequenceGenerator.cpp \
activemq/util/PrimitiveList.cpp \
activemq/state/ConnectionStateTracker.cpp \
@@ -325,6 +324,8 @@
activemq/state/ConnectionState.cpp \
activemq/state/ProducerState.cpp \
activemq/exceptions/ActiveMQException.cpp \
+ activemq/threads/DedicatedTaskRunner.cpp \
+ activemq/threads/CompositeTaskRunner.cpp \
activemq/core/ActiveMQSessionExecutor.cpp \
activemq/core/ActiveMQConstants.cpp \
activemq/core/ActiveMQConnectionMetaData.cpp \
@@ -847,9 +848,7 @@
activemq/util/CompositeData.h \
activemq/util/Usage.h \
activemq/util/LongSequenceGenerator.h \
- activemq/util/TaskRunner.h \
activemq/util/Config.h \
- activemq/util/Task.h \
activemq/util/PrimitiveList.h \
activemq/util/PrimitiveMap.h \
activemq/state/CommandVisitor.h \
@@ -864,6 +863,11 @@
activemq/exceptions/ExceptionDefines.h \
activemq/exceptions/ActiveMQException.h \
activemq/exceptions/BrokerException.h \
+ activemq/threads/CompositeTaskRunner.h \
+ activemq/threads/DedicatedTaskRunner.h \
+ activemq/threads/CompositeTask.h \
+ activemq/threads/TaskRunner.h \
+ activemq/threads/Task.h \
activemq/core/ActiveMQConsumer.h \
activemq/core/ActiveMQConnection.h \
activemq/core/ActiveMQConstants.h \
Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASK_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASK_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/Task.h>
+
+namespace activemq {
+namespace threads {
+
+ /**
+ * Represents a single task that can be part of a set of Tasks that are contained
+ * in a <code>CompositeTaskRunner</code>.
+ *
+ * @since 3.0
+ */
+ class AMQCPP_API CompositeTask : public activemq::threads::Task {
+ public:
+
+ virtual ~CompositeTask() {}
+
+ /**
+ * Indicates whether this task has any pending work that needs to be
+ * done, if not then it is skipped and the next Task in the
+ * CompositeTaskRunner's list of tasks is checked, if none of the tasks
+ * have any pending work to do, then the runner can go to sleep until it
+ * awakened by a call to <code>wakeup</code>.
+ *
+ * @since 3.0
+ */
+ virtual bool isPending() const = 0;
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASK_H_ */
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompositeTaskRunner.h"
+
+#include <memory>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTaskRunner::CompositeTaskRunner() {
+
+ this->threadTerminated = false;
+ this->pending = false;
+ this->shutDown = false;
+
+ this->thread.reset( new Thread( this ) );
+ this->thread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTaskRunner::~CompositeTaskRunner() {
+ try{
+ this->shutdown();
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::shutdown( unsigned int timeout ) {
+
+ synchronized( &mutex ) {
+ shutDown = true;
+ pending = true;
+ mutex.notifyAll();
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if( !threadTerminated ) {
+ mutex.wait( timeout );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::shutdown() {
+
+ synchronized( &mutex ) {
+ shutDown = true;
+ pending = true;
+ mutex.notifyAll();
+ }
+
+ // Wait till the thread stops
+ if( !threadTerminated ) {
+ this->thread->join();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::wakeup() {
+
+ synchronized( &mutex ) {
+ if( shutDown) {
+ return;
+ }
+ pending = true;
+ mutex.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::run() {
+
+ try {
+
+ while( true ) {
+
+ synchronized( &mutex ) {
+ pending = false;
+ if( shutDown ) {
+ return;
+ }
+ }
+
+ if( !this->iterate() ) {
+
+ // wait to be notified.
+ synchronized( &mutex ) {
+ if( shutDown ) {
+ return;
+ }
+ while( !pending ) {
+ mutex.wait();
+ }
+ }
+ }
+
+ }
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+
+ // Make sure we notify any waiting threads that thread
+ // has terminated.
+ synchronized( &mutex ) {
+ threadTerminated = true;
+ mutex.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::addTask( CompositeTask* task ) {
+
+ if( task != NULL ) {
+
+ synchronized( &tasks ) {
+ this->tasks.add( task );
+ this->wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::removeTask( CompositeTask* task ) {
+
+ if( task != NULL ) {
+
+ synchronized( &tasks ) {
+ this->tasks.remove( task );
+ this->wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool CompositeTaskRunner::iterate() {
+
+ synchronized( &tasks ) {
+
+ auto_ptr< Iterator<CompositeTask*> > iter( tasks.iterator() );
+
+ while( iter->hasNext() ) {
+
+ CompositeTask* task = iter->next();
+
+ if( task->isPending() ) {
+ task->iterate();
+
+ // Always return true, so that we check again for
+ // any of the other tasks that might now be pending.
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/TaskRunner.h>
+#include <activemq/threads/CompositeTask.h>
+#include <decaf/util/StlSet.h>
+#include <decaf/util/StlList.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace threads {
+
+ using decaf::lang::Pointer;
+
+ /**
+ * A Task Runner that can contain one or more CompositeTasks that are each checked
+ * for pending work and run if any is present in the order that the tasks were added.
+ *
+ * @since 3.0
+ */
+ class AMQCPP_API CompositeTaskRunner : public activemq::threads::TaskRunner,
+ public activemq::threads::Task,
+ public decaf::lang::Runnable {
+ private:
+
+ decaf::util::StlList<CompositeTask*> tasks;
+ decaf::util::concurrent::Mutex mutex;
+
+ Pointer<decaf::lang::Thread> thread;
+
+ bool threadTerminated;
+ bool pending;
+ bool shutDown;
+
+ public:
+
+ CompositeTaskRunner();
+
+ virtual ~CompositeTaskRunner();
+
+ /**
+ * Adds a new CompositeTask to the Set of Tasks that this class manages.
+ * @param task - Pointer to a CompositeTask instance.
+ */
+ void addTask( CompositeTask* task );
+
+ /**
+ * Removes a CompositeTask that was added previously
+ * @param task - Pointer to a CompositeTask instance.
+ */
+ void removeTask( CompositeTask* task );
+
+ /**
+ * Shutdown after a timeout, does not guarantee that the task's iterate
+ * method has completed and the thread halted.
+ *
+ * @param timeout - Time in Milliseconds to wait for the task to stop.
+ */
+ virtual void shutdown( unsigned int timeout );
+
+ /**
+ * Shutdown once the task has finished and the TaskRunner's thread has exited.
+ */
+ virtual void shutdown();
+
+ /**
+ * Signal the TaskRunner to wakeup and execute another iteration cycle on
+ * the task, the Task instance will be run until its iterate method has
+ * returned false indicating it is done.
+ */
+ virtual void wakeup();
+
+ protected:
+
+ virtual void run();
+
+ virtual bool iterate();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_ */
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DedicatedTaskRunner.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+DedicatedTaskRunner::DedicatedTaskRunner( Task* task ) : task( task ) {
+
+ if( this->task == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Task passed was null" );
+ }
+
+ this->threadTerminated = false;
+ this->pending = false;
+ this->shutDown = false;
+
+ this->thread.reset( new Thread( this ) );
+ this->thread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DedicatedTaskRunner::~DedicatedTaskRunner() {
+ try{
+ this->shutdown();
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::shutdown( unsigned int timeout ) {
+
+ synchronized( &mutex ) {
+ shutDown = true;
+ pending = true;
+ mutex.notifyAll();
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if( !threadTerminated ) {
+ mutex.wait( timeout );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::shutdown() {
+
+ synchronized( &mutex ) {
+ shutDown = true;
+ pending = true;
+ mutex.notifyAll();
+ }
+
+ // Wait till the thread stops
+ if( !threadTerminated ) {
+ this->thread->join();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::wakeup() {
+
+ synchronized( &mutex ) {
+ if( shutDown) {
+ return;
+ }
+ pending = true;
+ mutex.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::run() {
+
+ try {
+
+ while( true ) {
+
+ synchronized( &mutex ) {
+ pending = false;
+ if( shutDown ) {
+ return;
+ }
+ }
+
+ if( !this->task->iterate() ) {
+
+ // wait to be notified.
+ synchronized( &mutex ) {
+ if( shutDown ) {
+ return;
+ }
+ while( !pending ) {
+ mutex.wait();
+ }
+ }
+ }
+
+ }
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+
+ // Make sure we notify any waiting threads that thread
+ // has terminated.
+ synchronized( &mutex ) {
+ threadTerminated = true;
+ mutex.notifyAll();
+ }
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/TaskRunner.h>
+#include <activemq/threads/Task.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace threads {
+
+ using decaf::lang::Pointer;
+
+ class AMQCPP_API DedicatedTaskRunner : public TaskRunner,
+ public decaf::lang::Runnable {
+ private:
+
+ decaf::util::concurrent::Mutex mutex;
+
+ Pointer<decaf::lang::Thread> thread;
+
+ bool threadTerminated;
+ bool pending;
+ bool shutDown;
+
+ Task* task;
+
+ public:
+
+ DedicatedTaskRunner( Task* task );
+ virtual ~DedicatedTaskRunner();
+
+ /**
+ * Shutdown after a timeout, does not guarantee that the task's iterate
+ * method has completed and the thread halted.
+ *
+ * @param timeout - Time in Milliseconds to wait for the task to stop.
+ */
+ virtual void shutdown( unsigned int timeout );
+
+ /**
+ * Shutdown once the task has finished and the TaskRunner's thread has exited.
+ */
+ virtual void shutdown();
+
+ /**
+ * Signal the TaskRunner to wakeup and execute another iteration cycle on
+ * the task, the Task instance will be run until its iterate method has
+ * returned false indicating it is done.
+ */
+ virtual void wakeup();
+
+ protected:
+
+ virtual void run();
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_*/
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h (from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h?p2=activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h&p1=activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h Tue Mar 24 15:23:52 2009
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-#ifndef _ACTIVEMQ_UTIL_TASK_H_
-#define _ACTIVEMQ_UTIL_TASK_H_
+#ifndef _ACTIVEMQ_THREADS_TASK_H_
+#define _ACTIVEMQ_THREADS_TASK_H_
#include <activemq/util/Config.h>
namespace activemq {
-namespace util {
+namespace threads {
/**
* Represents a unit of work that requires one or more iterations to complete.
@@ -47,4 +47,4 @@
}}
-#endif /* _ACTIVEMQ_UTIL_TASK_H_ */
+#endif /* _ACTIVEMQ_THREADS_TASK_H_ */
Copied: activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h (from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h?p2=activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h&p1=activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h Tue Mar 24 15:23:52 2009
@@ -15,38 +15,18 @@
* limitations under the License.
*/
-#ifndef _ACTIVEMQ_UTIL_TASKRUNNER_H_
-#define _ACTIVEMQ_UTIL_TASKRUNNER_H_
+#ifndef _ACTIVEMQ_THREADS_TASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_TASKRUNNER_H_
-#include <activemq/util/Task.h>
-
-#include <decaf/lang/Thread.h>
-#include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/lang/Pointer.h>
+#include <activemq/threads/Task.h>
namespace activemq {
-namespace util {
-
- using decaf::lang::Pointer;
-
- class TaskRunner : public decaf::lang::Runnable {
- private:
-
- decaf::util::concurrent::Mutex mutex;
-
- Pointer<decaf::lang::Thread> thread;
-
- bool threadTerminated;
- bool pending;
- bool shutDown;
-
- Task* task;
+namespace threads {
+ class TaskRunner {
public:
- TaskRunner( Task* task );
- virtual ~TaskRunner();
+ virtual ~TaskRunner() {}
/**
* Shutdown after a timeout, does not guarantee that the task's iterate
@@ -54,26 +34,22 @@
*
* @param timeout - Time in Milliseconds to wait for the task to stop.
*/
- void shutdown( unsigned int timeout );
+ virtual void shutdown( unsigned int timeout ) = 0;
/**
* Shutdown once the task has finished and the TaskRunner's thread has exited.
*/
- void shutdown();
+ virtual void shutdown() = 0;
/**
* Signal the TaskRunner to wakeup and execute another iteration cycle on
* the task, the Task instance will be run until its iterate method has
* returned false indicating it is done.
*/
- void wakeup();
-
- protected:
-
- virtual void run();
+ virtual void wakeup() = 0;
};
}}
-#endif /*_ACTIVEMQ_UTIL_TASKRUNNER_H_*/
+#endif /*_ACTIVEMQ_THREADS_TASKRUNNER_H_*/
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Tue Mar 24 15:23:52 2009
@@ -21,6 +21,7 @@
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/transport/TransportRegistry.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
#include <decaf/util/Random.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>
@@ -65,7 +66,7 @@
this->stateTracker.setTrackTransactions( true );
this->myTransportListener.reset( new FailoverTransportListener( this ) );
this->reconnectTask.reset( new ReconnectTask( this ) );
- this->taskRunner.reset( new TaskRunner( reconnectTask.get() ) );
+ this->taskRunner.reset( new DedicatedTaskRunner( reconnectTask.get() ) );
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Tue Mar 24 15:23:52 2009
@@ -21,7 +21,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/Command.h>
-#include <activemq/util/TaskRunner.h>
+#include <activemq/threads/TaskRunner.h>
#include <activemq/state/ConnectionStateTracker.h>
#include <activemq/transport/CompositeTransport.h>
#include <activemq/transport/failover/BackupTransport.h>
@@ -44,7 +44,7 @@
using namespace decaf::lang;
using decaf::net::URI;
using namespace decaf::util;
- using namespace activemq::util;
+ using namespace activemq::threads;
using activemq::commands::Command;
using activemq::commands::Response;
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h Tue Mar 24 15:23:52 2009
@@ -20,7 +20,7 @@
#include <activemq/util/Config.h>
-#include <activemq/util/Task.h>
+#include <activemq/threads/Task.h>
namespace activemq {
namespace transport {
@@ -28,7 +28,7 @@
class FailoverTransport;
- class AMQCPP_API ReconnectTask : public activemq::util::Task {
+ class AMQCPP_API ReconnectTask : public activemq::threads::Task {
private:
FailoverTransport* parent;
Modified: activemq/activemq-cpp/trunk/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/Makefile.am?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/test/Makefile.am Tue Mar 24 15:23:52 2009
@@ -95,7 +95,6 @@
activemq/cmsutil/CmsTemplateTest.cpp \
activemq/util/PrimitiveValueNodeTest.cpp \
activemq/util/LongSequenceGeneratorTest.cpp \
- activemq/util/TaskRunnerTest.cpp \
activemq/util/PrimitiveMapTest.cpp \
activemq/util/MemoryUsageTest.cpp \
activemq/util/URISupportTest.cpp \
@@ -107,6 +106,8 @@
activemq/state/SessionStateTest.cpp \
activemq/state/ConnectionStateTest.cpp \
activemq/exceptions/ActiveMQExceptionTest.cpp \
+ activemq/threads/DedicatedTaskRunnerTest.cpp \
+ activemq/threads/CompositeTaskRunnerTest.cpp \
activemq/core/ActiveMQSessionTest.cpp \
activemq/core/ActiveMQConnectionFactoryTest.cpp \
activemq/core/ActiveMQConnectionTest.cpp \
@@ -208,7 +209,6 @@
activemq/cmsutil/CmsAccessorTest.h \
activemq/cmsutil/DummyMessageCreator.h \
activemq/util/PrimitiveValueNodeTest.h \
- activemq/util/TaskRunnerTest.h \
activemq/util/PrimitiveMapTest.h \
activemq/util/PrimitiveListTest.h \
activemq/util/URISupportTest.h \
@@ -221,6 +221,8 @@
activemq/state/ConsumerStateTest.h \
activemq/state/SessionStateTest.h \
activemq/exceptions/ActiveMQExceptionTest.h \
+ activemq/threads/CompositeTaskRunnerTest.h \
+ activemq/threads/DedicatedTaskRunnerTest.h \
activemq/core/ActiveMQConnectionFactoryTest.h \
activemq/core/ActiveMQConnectionTest.h \
activemq/core/ActiveMQSessionTest.h \
Added: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp (added)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompositeTaskRunnerTest.h"
+
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
+#include <decaf/lang/Thread.h>
+
+#include <iostream>
+#include <iomanip>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::threads;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+class CountingTask : public CompositeTask {
+private:
+
+ int count;
+ int goal;
+ std::string name;
+
+public:
+
+ CountingTask( const std::string& name, int goal ) : count(0), goal(goal), name(name) {}
+
+ int getCount() const {
+ return count;
+ }
+
+ virtual bool isPending() const {
+ return count != goal;
+ }
+
+ virtual bool iterate() {
+ return !( ++count == goal );
+ }
+
+};
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunnerTest::test() {
+
+ int attempts = 0;
+
+ CompositeTaskRunner runner;
+
+ CountingTask task1( "task1", 100);
+ CountingTask task2( "task2", 200);
+
+ runner.addTask( &task1 );
+ runner.addTask( &task2 );
+
+ runner.wakeup();
+
+ while( attempts++ != 10 ) {
+
+ Thread::sleep( 1000 );
+
+ if( task1.getCount() == 100 && task2.getCount() == 200 ) {
+ break;
+ }
+ }
+
+ std::cout << "\n";
+ std::cout << "task1.count = " << task1.getCount() << std::endl;
+ std::cout << "task2.count = " << task2.getCount() << std::endl;
+
+ CPPUNIT_ASSERT( task1.getCount() == 100 );
+ CPPUNIT_ASSERT( task2.getCount() == 200 );
+
+ runner.removeTask( &task1 );
+ runner.removeTask( &task2 );
+
+}
+
Propchange: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h (added)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace threads {
+
+ class CompositeTaskRunnerTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( CompositeTaskRunnerTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ CompositeTaskRunnerTest() {}
+ virtual ~CompositeTaskRunnerTest() {}
+
+ void test();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_ */
Propchange: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp (from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp?p2=activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp&p1=activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp Tue Mar 24 15:23:52 2009
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-#include "TaskRunnerTest.h"
+#include "DedicatedTaskRunnerTest.h"
#include <memory>
-#include <activemq/util/Task.h>
-#include <activemq/util/TaskRunner.h>
+#include <activemq/threads/Task.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/exceptions/NullPointerException.h>
using namespace activemq;
-using namespace activemq::util;
+using namespace activemq::threads;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
@@ -71,16 +71,16 @@
};
////////////////////////////////////////////////////////////////////////////////
-void TaskRunnerTest::testSimple() {
+void DedicatedTaskRunnerTest::testSimple() {
CPPUNIT_ASSERT_THROW_MESSAGE(
"Should throw a NullPointerException",
- std::auto_ptr<TaskRunner>( new TaskRunner( NULL ) ),
+ std::auto_ptr<TaskRunner>( new DedicatedTaskRunner( NULL ) ),
NullPointerException );
SimpleCountingTask simpleTask;
CPPUNIT_ASSERT( simpleTask.getCount() == 0 );
- TaskRunner simpleTaskRunner( &simpleTask );
+ DedicatedTaskRunner simpleTaskRunner( &simpleTask );
simpleTaskRunner.wakeup();
Thread::sleep( 250 );
@@ -91,11 +91,11 @@
InfiniteCountingTask infiniteTask;
CPPUNIT_ASSERT( infiniteTask.getCount() == 0 );
- TaskRunner infiniteTaskRunner( &infiniteTask );
+ DedicatedTaskRunner infiniteTaskRunner( &infiniteTask );
Thread::sleep( 500 );
CPPUNIT_ASSERT( infiniteTask.getCount() != 0 );
infiniteTaskRunner.shutdown();
- int count = infiniteTask.getCount();
+ unsigned int count = infiniteTask.getCount();
Thread::sleep( 250 );
CPPUNIT_ASSERT( infiniteTask.getCount() == count );
Copied: activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h (from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h?p2=activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h&p1=activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h Tue Mar 24 15:23:52 2009
@@ -15,25 +15,25 @@
* limitations under the License.
*/
-#ifndef _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_
-#define _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_
+#ifndef _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_
+#define _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
namespace activemq {
-namespace util {
+namespace threads {
- class TaskRunnerTest : public CppUnit::TestFixture {
+ class DedicatedTaskRunnerTest : public CppUnit::TestFixture {
- CPPUNIT_TEST_SUITE( TaskRunnerTest );
+ CPPUNIT_TEST_SUITE( DedicatedTaskRunnerTest );
CPPUNIT_TEST( testSimple );
CPPUNIT_TEST_SUITE_END();
public:
- TaskRunnerTest() {}
- virtual ~TaskRunnerTest() {}
+ DedicatedTaskRunnerTest() {}
+ virtual ~DedicatedTaskRunnerTest() {}
void testSimple();
@@ -41,4 +41,4 @@
}}
-#endif /* _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_ */
+#endif /* _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_ */
Modified: activemq/activemq-cpp/trunk/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/testRegistry.cpp?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/testRegistry.cpp Tue Mar 24 15:23:52 2009
@@ -118,8 +118,11 @@
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::URISupportTest );
#include <activemq/util/MemoryUsageTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::MemoryUsageTest );
-#include <activemq/util/TaskRunnerTest.h>
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::TaskRunnerTest );
+
+#include <activemq/threads/DedicatedTaskRunnerTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::threads::DedicatedTaskRunnerTest );
+#include <activemq/threads/CompositeTaskRunnerTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::threads::CompositeTaskRunnerTest );
#include <activemq/wireformat/WireFormatRegistryTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::wireformat::WireFormatRegistryTest );