You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/24 16:23:59 UTC

svn commit: r757843 - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/threads/ main/activemq/transport/failover/ main/activemq/util/ test/ test/activemq/threads/ test/activemq/util/

Author: tabish
Date: Tue Mar 24 15:23:52 2009
New Revision: 757843

URL: http://svn.apache.org/viewvc?rev=757843&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

Refactored TaskRunner classes to add a composite task runner and a dedicated task runner for use failover transport.  The dedicated task runner will be used to close transports out of band to avoid deadlock when a transport is close as a result of a call to the onException method from the inner thread of the IOTransport.

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/threads/
    activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h
      - copied, changed from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h
    activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h
      - copied, changed from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h
    activemq/activemq-cpp/trunk/src/test/activemq/threads/
    activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp   (with props)
    activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h   (with props)
    activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp
      - copied, changed from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h
      - copied, changed from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h
Removed:
    activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h
    activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h
    activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
    activemq/activemq-cpp/trunk/src/test/Makefile.am
    activemq/activemq-cpp/trunk/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Tue Mar 24 15:23:52 2009
@@ -314,7 +314,6 @@
     activemq/util/PrimitiveValueNode.cpp \
     activemq/util/CompositeData.cpp \
     activemq/util/MemoryUsage.cpp \
-    activemq/util/TaskRunner.cpp \
     activemq/util/LongSequenceGenerator.cpp \
     activemq/util/PrimitiveList.cpp \
     activemq/state/ConnectionStateTracker.cpp \
@@ -325,6 +324,8 @@
     activemq/state/ConnectionState.cpp \
     activemq/state/ProducerState.cpp \
     activemq/exceptions/ActiveMQException.cpp \
+    activemq/threads/DedicatedTaskRunner.cpp \
+    activemq/threads/CompositeTaskRunner.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
     activemq/core/ActiveMQConstants.cpp \
     activemq/core/ActiveMQConnectionMetaData.cpp \
@@ -847,9 +848,7 @@
     activemq/util/CompositeData.h \
     activemq/util/Usage.h \
     activemq/util/LongSequenceGenerator.h \
-    activemq/util/TaskRunner.h \
     activemq/util/Config.h \
-    activemq/util/Task.h \
     activemq/util/PrimitiveList.h \
     activemq/util/PrimitiveMap.h \
     activemq/state/CommandVisitor.h \
@@ -864,6 +863,11 @@
     activemq/exceptions/ExceptionDefines.h \
     activemq/exceptions/ActiveMQException.h \
     activemq/exceptions/BrokerException.h \
+    activemq/threads/CompositeTaskRunner.h \
+    activemq/threads/DedicatedTaskRunner.h \
+    activemq/threads/CompositeTask.h \
+    activemq/threads/TaskRunner.h \
+    activemq/threads/Task.h \
     activemq/core/ActiveMQConsumer.h \
     activemq/core/ActiveMQConnection.h \
     activemq/core/ActiveMQConstants.h \

Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASK_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASK_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/Task.h>
+
+namespace activemq {
+namespace threads {
+
+    /**
+     * Represents a single task that can be part of a set of Tasks that are contained
+     * in a <code>CompositeTaskRunner</code>.
+     *
+     * @since 3.0
+     */
+    class AMQCPP_API CompositeTask : public activemq::threads::Task {
+    public:
+
+        virtual ~CompositeTask() {}
+
+        /**
+         * Indicates whether this task has any pending work that needs to be
+         * done, if not then it is skipped and the next Task in the
+         * CompositeTaskRunner's list of tasks is checked, if none of the tasks
+         * have any pending work to do, then the runner can go to sleep until it
+         * awakened by a call to <code>wakeup</code>.
+         *
+         * @since 3.0
+         */
+        virtual bool isPending() const = 0;
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASK_H_ */

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTask.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompositeTaskRunner.h"
+
+#include <memory>
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTaskRunner::CompositeTaskRunner() {
+
+    this->threadTerminated = false;
+    this->pending = false;
+    this->shutDown = false;
+
+    this->thread.reset( new Thread( this ) );
+    this->thread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTaskRunner::~CompositeTaskRunner() {
+    try{
+        this->shutdown();
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::shutdown( unsigned int timeout ) {
+
+    synchronized( &mutex ) {
+        shutDown = true;
+        pending = true;
+        mutex.notifyAll();
+
+        // Wait till the thread stops ( no need to wait if shutdown
+        // is called from thread that is shutting down)
+        if( !threadTerminated ) {
+            mutex.wait( timeout );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::shutdown() {
+
+    synchronized( &mutex ) {
+        shutDown = true;
+        pending = true;
+        mutex.notifyAll();
+    }
+
+    // Wait till the thread stops
+    if( !threadTerminated ) {
+        this->thread->join();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::wakeup() {
+
+    synchronized( &mutex ) {
+        if( shutDown) {
+            return;
+        }
+        pending = true;
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::run() {
+
+    try {
+
+        while( true ) {
+
+            synchronized( &mutex ) {
+                pending = false;
+                if( shutDown ) {
+                    return;
+                }
+            }
+
+            if( !this->iterate() ) {
+
+                // wait to be notified.
+                synchronized( &mutex ) {
+                    if( shutDown ) {
+                        return;
+                    }
+                    while( !pending ) {
+                        mutex.wait();
+                    }
+                }
+            }
+
+        }
+    }
+    AMQ_CATCH_NOTHROW( Exception )
+    AMQ_CATCHALL_NOTHROW()
+
+    // Make sure we notify any waiting threads that thread
+    // has terminated.
+    synchronized( &mutex ) {
+        threadTerminated = true;
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::addTask( CompositeTask* task ) {
+
+    if( task != NULL ) {
+
+        synchronized( &tasks ) {
+            this->tasks.add( task );
+            this->wakeup();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::removeTask( CompositeTask* task ) {
+
+    if( task != NULL ) {
+
+        synchronized( &tasks ) {
+            this->tasks.remove( task );
+            this->wakeup();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool CompositeTaskRunner::iterate() {
+
+    synchronized( &tasks ) {
+
+        auto_ptr< Iterator<CompositeTask*> > iter( tasks.iterator() );
+
+        while( iter->hasNext() ) {
+
+            CompositeTask* task = iter->next();
+
+            if( task->isPending() ) {
+                task->iterate();
+
+                // Always return true, so that we check again for
+                // any of the other tasks that might now be pending.
+                return true;
+            }
+        }
+    }
+
+    return false;
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/TaskRunner.h>
+#include <activemq/threads/CompositeTask.h>
+#include <decaf/util/StlSet.h>
+#include <decaf/util/StlList.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace threads {
+
+    using decaf::lang::Pointer;
+
+    /**
+     * A Task Runner that can contain one or more CompositeTasks that are each checked
+     * for pending work and run if any is present in the order that the tasks were added.
+     *
+     * @since 3.0
+     */
+    class AMQCPP_API CompositeTaskRunner : public activemq::threads::TaskRunner,
+                                           public activemq::threads::Task,
+                                           public decaf::lang::Runnable {
+    private:
+
+        decaf::util::StlList<CompositeTask*> tasks;
+        decaf::util::concurrent::Mutex mutex;
+
+        Pointer<decaf::lang::Thread> thread;
+
+        bool threadTerminated;
+        bool pending;
+        bool shutDown;
+
+    public:
+
+        CompositeTaskRunner();
+
+        virtual ~CompositeTaskRunner();
+
+        /**
+         * Adds a new CompositeTask to the Set of Tasks that this class manages.
+         * @param task - Pointer to a CompositeTask instance.
+         */
+        void addTask( CompositeTask* task );
+
+        /**
+         * Removes a CompositeTask that was added previously
+         * @param task - Pointer to a CompositeTask instance.
+         */
+        void removeTask( CompositeTask* task );
+
+        /**
+         * Shutdown after a timeout, does not guarantee that the task's iterate
+         * method has completed and the thread halted.
+         *
+         * @param timeout - Time in Milliseconds to wait for the task to stop.
+         */
+        virtual void shutdown( unsigned int timeout );
+
+        /**
+         * Shutdown once the task has finished and the TaskRunner's thread has exited.
+         */
+        virtual void shutdown();
+
+        /**
+         * Signal the TaskRunner to wakeup and execute another iteration cycle on
+         * the task, the Task instance will be run until its iterate method has
+         * returned false indicating it is done.
+         */
+        virtual void wakeup();
+
+    protected:
+
+        virtual void run();
+
+        virtual bool iterate();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASKRUNNER_H_ */

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/CompositeTaskRunner.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DedicatedTaskRunner.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+DedicatedTaskRunner::DedicatedTaskRunner( Task* task ) : task( task ) {
+
+    if( this->task == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Task passed was null" );
+    }
+
+    this->threadTerminated = false;
+    this->pending = false;
+    this->shutDown = false;
+
+    this->thread.reset( new Thread( this ) );
+    this->thread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DedicatedTaskRunner::~DedicatedTaskRunner() {
+    try{
+        this->shutdown();
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::shutdown( unsigned int timeout ) {
+
+    synchronized( &mutex ) {
+        shutDown = true;
+        pending = true;
+        mutex.notifyAll();
+
+        // Wait till the thread stops ( no need to wait if shutdown
+        // is called from thread that is shutting down)
+        if( !threadTerminated ) {
+            mutex.wait( timeout );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::shutdown() {
+
+    synchronized( &mutex ) {
+        shutDown = true;
+        pending = true;
+        mutex.notifyAll();
+    }
+
+    // Wait till the thread stops
+    if( !threadTerminated ) {
+        this->thread->join();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::wakeup() {
+
+    synchronized( &mutex ) {
+        if( shutDown) {
+            return;
+        }
+        pending = true;
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::run() {
+
+    try {
+
+        while( true ) {
+
+            synchronized( &mutex ) {
+                pending = false;
+                if( shutDown ) {
+                    return;
+                }
+            }
+
+            if( !this->task->iterate() ) {
+
+                // wait to be notified.
+                synchronized( &mutex ) {
+                    if( shutDown ) {
+                        return;
+                    }
+                    while( !pending ) {
+                        mutex.wait();
+                    }
+                }
+            }
+
+        }
+    }
+    AMQ_CATCH_NOTHROW( Exception )
+    AMQ_CATCHALL_NOTHROW()
+
+    // Make sure we notify any waiting threads that thread
+    // has terminated.
+    synchronized( &mutex ) {
+        threadTerminated = true;
+        mutex.notifyAll();
+    }
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/TaskRunner.h>
+#include <activemq/threads/Task.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace threads {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API DedicatedTaskRunner : public TaskRunner,
+                                           public decaf::lang::Runnable {
+    private:
+
+        decaf::util::concurrent::Mutex mutex;
+
+        Pointer<decaf::lang::Thread> thread;
+
+        bool threadTerminated;
+        bool pending;
+        bool shutDown;
+
+        Task* task;
+
+    public:
+
+        DedicatedTaskRunner( Task* task );
+        virtual ~DedicatedTaskRunner();
+
+        /**
+         * Shutdown after a timeout, does not guarantee that the task's iterate
+         * method has completed and the thread halted.
+         *
+         * @param timeout - Time in Milliseconds to wait for the task to stop.
+         */
+        virtual void shutdown( unsigned int timeout );
+
+        /**
+         * Shutdown once the task has finished and the TaskRunner's thread has exited.
+         */
+        virtual void shutdown();
+
+        /**
+         * Signal the TaskRunner to wakeup and execute another iteration cycle on
+         * the task, the Task instance will be run until its iterate method has
+         * returned false indicating it is done.
+         */
+        virtual void wakeup();
+
+    protected:
+
+        virtual void run();
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_THREADS_DEDICATEDTASKRUNNER_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/threads/DedicatedTaskRunner.h
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h (from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h?p2=activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h&p1=activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/util/Task.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/Task.h Tue Mar 24 15:23:52 2009
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-#ifndef _ACTIVEMQ_UTIL_TASK_H_
-#define _ACTIVEMQ_UTIL_TASK_H_
+#ifndef _ACTIVEMQ_THREADS_TASK_H_
+#define _ACTIVEMQ_THREADS_TASK_H_
 
 #include <activemq/util/Config.h>
 
 namespace activemq {
-namespace util {
+namespace threads {
 
     /**
      * Represents a unit of work that requires one or more iterations to complete.
@@ -47,4 +47,4 @@
 
 }}
 
-#endif /* _ACTIVEMQ_UTIL_TASK_H_ */
+#endif /* _ACTIVEMQ_THREADS_TASK_H_ */

Copied: activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h (from r757399, activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h?p2=activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h&p1=activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/util/TaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/threads/TaskRunner.h Tue Mar 24 15:23:52 2009
@@ -15,38 +15,18 @@
  * limitations under the License.
  */
 
-#ifndef _ACTIVEMQ_UTIL_TASKRUNNER_H_
-#define _ACTIVEMQ_UTIL_TASKRUNNER_H_
+#ifndef _ACTIVEMQ_THREADS_TASKRUNNER_H_
+#define _ACTIVEMQ_THREADS_TASKRUNNER_H_
 
-#include <activemq/util/Task.h>
-
-#include <decaf/lang/Thread.h>
-#include <decaf/lang/Runnable.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/lang/Pointer.h>
+#include <activemq/threads/Task.h>
 
 namespace activemq {
-namespace util {
-
-    using decaf::lang::Pointer;
-
-    class TaskRunner : public decaf::lang::Runnable {
-    private:
-
-        decaf::util::concurrent::Mutex mutex;
-
-        Pointer<decaf::lang::Thread> thread;
-
-        bool threadTerminated;
-        bool pending;
-        bool shutDown;
-
-        Task* task;
+namespace threads {
 
+    class TaskRunner {
     public:
 
-        TaskRunner( Task* task );
-        virtual ~TaskRunner();
+        virtual ~TaskRunner() {}
 
         /**
          * Shutdown after a timeout, does not guarantee that the task's iterate
@@ -54,26 +34,22 @@
          *
          * @param timeout - Time in Milliseconds to wait for the task to stop.
          */
-        void shutdown( unsigned int timeout );
+        virtual void shutdown( unsigned int timeout ) = 0;
 
         /**
          * Shutdown once the task has finished and the TaskRunner's thread has exited.
          */
-        void shutdown();
+        virtual void shutdown() = 0;
 
         /**
          * Signal the TaskRunner to wakeup and execute another iteration cycle on
          * the task, the Task instance will be run until its iterate method has
          * returned false indicating it is done.
          */
-        void wakeup();
-
-    protected:
-
-        virtual void run();
+        virtual void wakeup() = 0;
 
     };
 
 }}
 
-#endif /*_ACTIVEMQ_UTIL_TASKRUNNER_H_*/
+#endif /*_ACTIVEMQ_THREADS_TASKRUNNER_H_*/

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Tue Mar 24 15:23:52 2009
@@ -21,6 +21,7 @@
 #include <activemq/commands/ShutdownInfo.h>
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/transport/TransportRegistry.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
 #include <decaf/util/Random.h>
 #include <decaf/lang/System.h>
 #include <decaf/lang/Integer.h>
@@ -65,7 +66,7 @@
     this->stateTracker.setTrackTransactions( true );
     this->myTransportListener.reset( new FailoverTransportListener( this ) );
     this->reconnectTask.reset( new ReconnectTask( this ) );
-    this->taskRunner.reset( new TaskRunner( reconnectTask.get() ) );
+    this->taskRunner.reset( new DedicatedTaskRunner( reconnectTask.get() ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Tue Mar 24 15:23:52 2009
@@ -21,7 +21,7 @@
 #include <activemq/util/Config.h>
 
 #include <activemq/commands/Command.h>
-#include <activemq/util/TaskRunner.h>
+#include <activemq/threads/TaskRunner.h>
 #include <activemq/state/ConnectionStateTracker.h>
 #include <activemq/transport/CompositeTransport.h>
 #include <activemq/transport/failover/BackupTransport.h>
@@ -44,7 +44,7 @@
     using namespace decaf::lang;
     using decaf::net::URI;
     using namespace decaf::util;
-    using namespace activemq::util;
+    using namespace activemq::threads;
     using activemq::commands::Command;
     using activemq::commands::Response;
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h Tue Mar 24 15:23:52 2009
@@ -20,7 +20,7 @@
 
 #include <activemq/util/Config.h>
 
-#include <activemq/util/Task.h>
+#include <activemq/threads/Task.h>
 
 namespace activemq {
 namespace transport {
@@ -28,7 +28,7 @@
 
     class FailoverTransport;
 
-    class AMQCPP_API ReconnectTask : public activemq::util::Task {
+    class AMQCPP_API ReconnectTask : public activemq::threads::Task {
     private:
 
         FailoverTransport* parent;

Modified: activemq/activemq-cpp/trunk/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/Makefile.am?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/test/Makefile.am Tue Mar 24 15:23:52 2009
@@ -95,7 +95,6 @@
     activemq/cmsutil/CmsTemplateTest.cpp \
     activemq/util/PrimitiveValueNodeTest.cpp \
     activemq/util/LongSequenceGeneratorTest.cpp \
-    activemq/util/TaskRunnerTest.cpp \
     activemq/util/PrimitiveMapTest.cpp \
     activemq/util/MemoryUsageTest.cpp \
     activemq/util/URISupportTest.cpp \
@@ -107,6 +106,8 @@
     activemq/state/SessionStateTest.cpp \
     activemq/state/ConnectionStateTest.cpp \
     activemq/exceptions/ActiveMQExceptionTest.cpp \
+    activemq/threads/DedicatedTaskRunnerTest.cpp \
+    activemq/threads/CompositeTaskRunnerTest.cpp \
     activemq/core/ActiveMQSessionTest.cpp \
     activemq/core/ActiveMQConnectionFactoryTest.cpp \
     activemq/core/ActiveMQConnectionTest.cpp \
@@ -208,7 +209,6 @@
     activemq/cmsutil/CmsAccessorTest.h \
     activemq/cmsutil/DummyMessageCreator.h \
     activemq/util/PrimitiveValueNodeTest.h \
-    activemq/util/TaskRunnerTest.h \
     activemq/util/PrimitiveMapTest.h \
     activemq/util/PrimitiveListTest.h \
     activemq/util/URISupportTest.h \
@@ -221,6 +221,8 @@
     activemq/state/ConsumerStateTest.h \
     activemq/state/SessionStateTest.h \
     activemq/exceptions/ActiveMQExceptionTest.h \
+    activemq/threads/CompositeTaskRunnerTest.h \
+    activemq/threads/DedicatedTaskRunnerTest.h \
     activemq/core/ActiveMQConnectionFactoryTest.h \
     activemq/core/ActiveMQConnectionTest.h \
     activemq/core/ActiveMQSessionTest.h \

Added: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp (added)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp Tue Mar 24 15:23:52 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompositeTaskRunnerTest.h"
+
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
+#include <decaf/lang/Thread.h>
+
+#include <iostream>
+#include <iomanip>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::threads;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+class CountingTask : public CompositeTask {
+private:
+
+    int count;
+    int goal;
+    std::string name;
+
+public:
+
+    CountingTask( const std::string& name, int goal ) : count(0), goal(goal), name(name) {}
+
+    int getCount() const {
+        return count;
+    }
+
+    virtual bool isPending() const {
+        return count != goal;
+    }
+
+    virtual bool iterate() {
+        return !( ++count == goal );
+    }
+
+};
+
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunnerTest::test() {
+
+    int attempts = 0;
+
+    CompositeTaskRunner runner;
+
+    CountingTask task1( "task1", 100);
+    CountingTask task2( "task2", 200);
+
+    runner.addTask( &task1 );
+    runner.addTask( &task2 );
+
+    runner.wakeup();
+
+    while( attempts++ != 10 ) {
+
+        Thread::sleep( 1000 );
+
+        if( task1.getCount() == 100 && task2.getCount() == 200 ) {
+            break;
+        }
+    }
+
+    std::cout << "\n";
+    std::cout << "task1.count = " << task1.getCount() << std::endl;
+    std::cout << "task2.count = " << task2.getCount() << std::endl;
+
+    CPPUNIT_ASSERT( task1.getCount() == 100 );
+    CPPUNIT_ASSERT( task2.getCount() == 200 );
+
+    runner.removeTask( &task1 );
+    runner.removeTask( &task2 );
+
+}
+

Propchange: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h?rev=757843&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h (added)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h Tue Mar 24 15:23:52 2009
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_
+#define _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace threads {
+
+    class CompositeTaskRunnerTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( CompositeTaskRunnerTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        CompositeTaskRunnerTest() {}
+        virtual ~CompositeTaskRunnerTest() {}
+
+        void test();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_THREADS_COMPOSITETASKRUNNERTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/src/test/activemq/threads/CompositeTaskRunnerTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp (from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp?p2=activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp&p1=activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.cpp Tue Mar 24 15:23:52 2009
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-#include "TaskRunnerTest.h"
+#include "DedicatedTaskRunnerTest.h"
 
 #include <memory>
 
-#include <activemq/util/Task.h>
-#include <activemq/util/TaskRunner.h>
+#include <activemq/threads/Task.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
 
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 
 using namespace activemq;
-using namespace activemq::util;
+using namespace activemq::threads;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
@@ -71,16 +71,16 @@
 };
 
 ////////////////////////////////////////////////////////////////////////////////
-void TaskRunnerTest::testSimple() {
+void DedicatedTaskRunnerTest::testSimple() {
 
     CPPUNIT_ASSERT_THROW_MESSAGE(
         "Should throw a NullPointerException",
-        std::auto_ptr<TaskRunner>( new TaskRunner( NULL ) ),
+        std::auto_ptr<TaskRunner>( new DedicatedTaskRunner( NULL ) ),
         NullPointerException );
 
     SimpleCountingTask simpleTask;
     CPPUNIT_ASSERT( simpleTask.getCount() == 0 );
-    TaskRunner simpleTaskRunner( &simpleTask );
+    DedicatedTaskRunner simpleTaskRunner( &simpleTask );
 
     simpleTaskRunner.wakeup();
     Thread::sleep( 250 );
@@ -91,11 +91,11 @@
 
     InfiniteCountingTask infiniteTask;
     CPPUNIT_ASSERT( infiniteTask.getCount() == 0 );
-    TaskRunner infiniteTaskRunner( &infiniteTask );
+    DedicatedTaskRunner infiniteTaskRunner( &infiniteTask );
     Thread::sleep( 500 );
     CPPUNIT_ASSERT( infiniteTask.getCount() != 0 );
     infiniteTaskRunner.shutdown();
-    int count = infiniteTask.getCount();
+    unsigned int count = infiniteTask.getCount();
     Thread::sleep( 250 );
     CPPUNIT_ASSERT( infiniteTask.getCount() == count );
 

Copied: activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h (from r757399, activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h?p2=activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h&p1=activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h&r1=757399&r2=757843&rev=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/util/TaskRunnerTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/threads/DedicatedTaskRunnerTest.h Tue Mar 24 15:23:52 2009
@@ -15,25 +15,25 @@
  * limitations under the License.
  */
 
-#ifndef _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_
-#define _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_
+#ifndef _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_
+#define _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_
 
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 
 namespace activemq {
-namespace util {
+namespace threads {
 
-    class TaskRunnerTest : public CppUnit::TestFixture {
+    class DedicatedTaskRunnerTest : public CppUnit::TestFixture {
 
-        CPPUNIT_TEST_SUITE( TaskRunnerTest );
+        CPPUNIT_TEST_SUITE( DedicatedTaskRunnerTest );
         CPPUNIT_TEST( testSimple );
         CPPUNIT_TEST_SUITE_END();
 
     public:
 
-        TaskRunnerTest() {}
-        virtual ~TaskRunnerTest() {}
+        DedicatedTaskRunnerTest() {}
+        virtual ~DedicatedTaskRunnerTest() {}
 
         void testSimple();
 
@@ -41,4 +41,4 @@
 
 }}
 
-#endif /* _ACTIVEMQ_UTIL_TASKRUNNERTEST_H_ */
+#endif /* _ACTIVEMQ_THREADS_DEDICATEDTASKRUNNERTEST_H_ */

Modified: activemq/activemq-cpp/trunk/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/testRegistry.cpp?rev=757843&r1=757842&r2=757843&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/testRegistry.cpp Tue Mar 24 15:23:52 2009
@@ -118,8 +118,11 @@
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::URISupportTest );
 #include <activemq/util/MemoryUsageTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::MemoryUsageTest );
-#include <activemq/util/TaskRunnerTest.h>
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::util::TaskRunnerTest );
+
+#include <activemq/threads/DedicatedTaskRunnerTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::threads::DedicatedTaskRunnerTest );
+#include <activemq/threads/CompositeTaskRunnerTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::threads::CompositeTaskRunnerTest );
 
 #include <activemq/wireformat/WireFormatRegistryTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::wireformat::WireFormatRegistryTest );