You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/11 22:22:40 UTC

svn commit: r1091195 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/lang/ main/decaf/util/ main/decaf/util/concurrent/ main/decaf/util/concurrent/locks/ test/ test/decaf/util/concurrent/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Mon Apr 11 20:22:39 2011
@@ -18,10 +18,15 @@
 #define _DECAF_UTIL_CONCURRENT_THREADPOOLEXECUTOR_H_
 
 #include <decaf/lang/Runnable.h>
+#include <decaf/lang/Throwable.h>
 #include <decaf/util/concurrent/ThreadFactory.h>
 #include <decaf/util/concurrent/BlockingQueue.h>
 #include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/AbstractExecutorService.h>
+#include <decaf/util/concurrent/RejectedExecutionHandler.h>
+#include <decaf/util/concurrent/RejectedExecutionException.h>
 #include <decaf/util/LinkedList.h>
+#include <decaf/util/ArrayList.h>
 #include <decaf/util/Config.h>
 
 #include <vector>
@@ -50,7 +55,7 @@ namespace concurrent{
      * object that implements the <code>Runnable</code> interface and
      * one of the worker threads will executing it in its thread context.
      */
-    class DECAF_API ThreadPoolExecutor {
+    class DECAF_API ThreadPoolExecutor : public AbstractExecutorService {
     private:
 
         ThreadPoolExecutor( const ThreadPoolExecutor& );
@@ -93,48 +98,130 @@ namespace concurrent{
                            long long keepAliveTime, const TimeUnit& unit,
                            BlockingQueue<decaf::lang::Runnable*>* workQueue);
 
-        virtual ~ThreadPoolExecutor();
-
         /**
-         * Queue a task to be completed by one of the Pooled Threads at some point in the
-         * future.  The task can be rejected by this executor if it has been shut down or
-         * if the workQueue is full, rejected Runnables are not deleted by this executor.
-         * Upon successful return from this method the given Runnable pointer is considered
-         * to be owned by this Executor and will be deleted upon completion or shut down.
+         * Creates a new instance of a ThreadPoolExecutor.
          *
-         * @param task
-         *      The Runnable object that is to be executed.
+         * The executor instance is configured with the passed in parameters and a
+         * default thread Factory is used along with a default rejected execution
+         * handler.
+         *
+         * @param corePoolSize
+         *      The number of threads to pool regardless of their idle state.
+         * @param maxPoolSize
+         *      The maximum number of threads that will ever exist at one time in the pool.
+         * @param keepAliveTime
+         *      The maximum time to keep a thread in the pool for if the number of current
+         *      threads exceeds to core pool size.
+         * @param unit
+         *      The units that the keepAliveTime is specified in.
+         * @param workQueue
+         *      A BlockingQueue implementation that will be used to hold Runnable tasks
+         *      that are awaiting execution within this executor.  The Executor takes
+         *      ownership of the BlockingQueue instance passed once this method returns.
+         * @param handler
+         *      A RejectedExecutionHandler implementation that will be used to handle any
+         *      rejected tasks when they are submitted to this executor.  The Executor takes
+         *      ownership of the RejectedExecutionHandler instance passed once this method returns.
          *
-         * @throws RejectedExecutionException based on instruction from RejectedExecutionHandler
-         *         if the given task cannot be accepted for execution at this time.
-         * @throws NullPointerException - if command is null
+         * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative
+         *         or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize
+         *         is greater than maximumPoolSize.
+         * @throws NullPointerException if the workQueue pointer is NULL.
          */
-        virtual void execute(decaf::lang::Runnable* task);
+        ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                           long long keepAliveTime, const TimeUnit& unit,
+                           BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                           RejectedExecutionHandler* handler);
 
         /**
-         * Performs an orderly shutdown of this Executor.  Previously queued tasks are allowed
-         * to complete but no new tasks are accepted for execution.  Calling this method more
-         * than once has no affect on this executor.
+         * Creates a new instance of a ThreadPoolExecutor.
+         *
+         * The executor instance is configured with the passed in parameters and a
+         * default thread Factory is used along with a default rejected execution
+         * handler.
+         *
+         * @param corePoolSize
+         *      The number of threads to pool regardless of their idle state.
+         * @param maxPoolSize
+         *      The maximum number of threads that will ever exist at one time in the pool.
+         * @param keepAliveTime
+         *      The maximum time to keep a thread in the pool for if the number of current
+         *      threads exceeds to core pool size.
+         * @param unit
+         *      The units that the keepAliveTime is specified in.
+         * @param workQueue
+         *      A BlockingQueue implementation that will be used to hold Runnable tasks
+         *      that are awaiting execution within this executor.  The Executor takes
+         *      ownership of the BlockingQueue instance passed once this method returns.
+         * @param threadFactory
+         *      A ThreadFactory implementation that will be used to create worker threads
+         *      that are used by this executor to run the submitted tasks.  The Executor takes
+         *      ownership of the ThreadFactory instance passed once this method returns.
+         *
+         * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative
+         *         or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize
+         *         is greater than maximumPoolSize.
+         * @throws NullPointerException if the workQueue pointer is NULL.
          */
-        virtual void shutdown();
+        ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                           long long keepAliveTime, const TimeUnit& unit,
+                           BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                           ThreadFactory* threadFactory);
 
         /**
-         * The caller will block until the executor has completed termination meaning all tasks
-         * that where scheduled before shutdown have now completed and the executor is ready for
-         * deletion.  If the timeout period elapses before the executor reaches the terminated
-         * state then this method return false to indicate it has not terminated.
+         * Creates a new instance of a ThreadPoolExecutor.
          *
-         * @param timeout
-         *      The amount of time to wait before abandoning the wait for termination.
-         * @param unit
-         *      The unit of time that the timeout value represents.
+         * The executor instance is configured with the passed in parameters and a
+         * default thread Factory is used along with a default rejected execution
+         * handler.
          *
-         * @return true if the executor terminated or false if the timeout expired.
+         * @param corePoolSize
+         *      The number of threads to pool regardless of their idle state.
+         * @param maxPoolSize
+         *      The maximum number of threads that will ever exist at one time in the pool.
+         * @param keepAliveTime
+         *      The maximum time to keep a thread in the pool for if the number of current
+         *      threads exceeds to core pool size.
+         * @param unit
+         *      The units that the keepAliveTime is specified in.
+         * @param workQueue
+         *      A BlockingQueue implementation that will be used to hold Runnable tasks
+         *      that are awaiting execution within this executor.  The Executor takes
+         *      ownership of the BlockingQueue instance passed once this method returns.
+         * @param threadFactory
+         *      A ThreadFactory implementation that will be used to create worker threads
+         *      that are used by this executor to run the submitted tasks.  The Executor takes
+         *      ownership of the ThreadFactory instance passed once this method returns.
+         * @param handler
+         *      A RejectedExecutionHandler implementation that will be used to handle any
+         *      rejected tasks when they are submitted to this executor.  The Executor takes
+         *      ownership of the BlockingQueue instance passed once this method returns.
          *
-         * @throws InterruptedException if this call is interrupted while awaiting termination.
+         * @throws IllegalArguementException if the corePoolSize or keepAliveTime are negative
+         *         or the or if maximumPoolSize is less than or equal to zero, or if corePoolSize
+         *         is greater than maximumPoolSize.
+         * @throws NullPointerException if the workQueue pointer is NULL.
          */
+        ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+                           long long keepAliveTime, const TimeUnit& unit,
+                           BlockingQueue<decaf::lang::Runnable*>* workQueue,
+                           ThreadFactory* threadFactory,
+                           RejectedExecutionHandler* handler);
+
+        virtual ~ThreadPoolExecutor();
+
+        virtual void execute(decaf::lang::Runnable* task);
+
+        virtual void shutdown();
+
+        virtual ArrayList<decaf::lang::Runnable*> shutdownNow();
+
         virtual bool awaitTermination(long long timeout, const decaf::util::concurrent::TimeUnit& unit);
 
+        virtual bool isShutdown() const;
+
+        virtual bool isTerminated() const;
+
         /**
          * Returns the number of threads that currently exists for this Executor.
          *
@@ -150,6 +237,21 @@ namespace concurrent{
         virtual int getCorePoolSize() const;
 
         /**
+         * Set the number of threads that this executor treats as its core threads, this value
+         * will override the value set in the constructor.  If the value given is less than the
+         * current value then the core threads will shrink to the new value over time.  If the
+         * value is larger than the current value then new threads may be started to process
+         * currently pending tasks, otherwise they will be started as needed when new tasks
+         * arrive.
+         *
+         * @param poolSize
+         *      The new core pool size for this executor.
+         *
+         * @throws IllegalArgumentException if the pool size value is less than zero.
+         */
+        virtual void setCorePoolSize(int poolSize);
+
+        /**
          * Returns the configured maximum number of threads for this Executor.
          *
          * @return the configured maximum number of Threads.
@@ -157,6 +259,19 @@ namespace concurrent{
         virtual int getMaximumPoolSize() const;
 
         /**
+         * Sets the maximum number of workers this Executor is allowed to have at any given
+         * time above the core pool size.  This new value overrides any set in the constructor
+         * and if smaller than the current value worker threads will terminate as they complete
+         * their current tasks and become idle.
+         *
+         * @param maxSize
+         *      The new maximum allowed worker pool size.
+         *
+         * @throws IllegalArgumentException if maxSize is negative or less than core pool size.
+         */
+        virtual void setMaximumPoolSize(int maxSize);
+
+        /**
          * Returns the current number of pending tasks in the work queue.  This is
          * an approximation as the number of pending tasks can quickly changes as
          * tasks complete and new tasks are started.
@@ -190,28 +305,309 @@ namespace concurrent{
         virtual int getLargestPoolSize() const;
 
         /**
-         * Returns whether this executor has been shutdown or not.
+         * Provides access to the Task Queue used by this Executor.  This method is meant mainly
+         * for debugging and monitoring, care should be taken when using this method.  The executor
+         * continues to execute tasks from the Queue.
          *
-         * @return true if this executor has been shutdown.
+         * @returns a pointer to the blocking queue that this executor stores future tasks in.
          */
-        virtual bool isShutdown() const;
+        virtual BlockingQueue<decaf::lang::Runnable*>* getQueue();
 
         /**
-         * Returns whether all tasks have completed after this executor was shut down.
+         * Returns true if the executor has begin the process of terminating but has not yet
+         * completed the process of shutting down all worker threads.  If the Executor does
+         * not transition from this state to terminated after some time its generally an
+         * indication that one of the submitted tasks will not complete and the executor is
+         * locked in a terminating state.
          *
          * @return true if all tasks have completed after a request to shut down was made.
          */
-        virtual bool isTerminated() const;
+        virtual bool isTerminating() const;
+
+        /**
+         * When true this setting allows the threads in the core pool to terminate if they
+         * sit idle longer than the set keep alive time.  Core threads that terminate are
+         * replaced as needed by new ones on demand.  This settings requires that the set
+         * keep alive time be greater than zero and will throw an IllegalArguementException
+         * if that is not the case.
+         *
+         * @param value
+         *      Boolean value indicating if core threads are allowed to time out when idle.
+         *
+         * @throws IllegalArgumentException if the keep alive time is set to zero.
+         */
+        virtual void allowCoreThreadTimeout(bool value);
+
+        /**
+         * Returns whether this executor has been configured to allow core threads to
+         * terminate if they sit idle longer than the configured keep alive time. Threads
+         * that are not core threads continue to time out using the set keep alive value
+         * regardless of whether this option is enabled.
+         *
+         * @returns true if core threads can timeout when idle.
+         */
+        virtual bool allowsCoreThreadTimeout() const;
+
+        /**
+         * Returns the currently set value for the maximum amount of time a worker Thread
+         * that is not part of the core threads is allowed to sit idle before it terminates.
+         *
+         * @param unit
+         *      The unit of time to return the results in.
+         *
+         * @returns the configure keep alive time in the requested time units.
+         */
+        virtual long long getKeepAliveTime(const TimeUnit& unit) const;
+
+        /**
+         * Configures the amount of time a non core Thread will remain alive after it has
+         * completed its assigned task.  This value can also be applied to core threads if
+         * the allowCoreThreadsTimeout option is enabled.
+         *
+         * @param timeout
+         *      The amount of time an idle worker will live before terminating.
+         * @param unit
+         *      The units that the timeout is given in.
+         *
+         * @throws IllegalArgumentException if allowCoreThreadsTimeout is enabled and the
+         *         the timeout value given is zero, or the timeout given is negative.
+         */
+        virtual void setKeepAliveTime(long long timeout, const TimeUnit& unit);
+
+        /**
+         * Sets the ThreadFactory instance used to create new Threads for this Executor.
+         *
+         * This class takes ownership of the given ThreadFactory and will destroy it
+         * upon termination or when a new ThreadFactory is set using this method.
+         *
+         * @param factory
+         *      A ThreadFactory instance used by this Executor to create new Threads.
+         *
+         * @throws NullPointerException if the given factory pointer is NULL.
+         */
+        virtual void setThreadFactory(ThreadFactory* factory);
+
+        /**
+         * Gets the currently configured ThreadFactory.  It is considered a programming
+         * error to delete the pointer returned by this method.
+         *
+         * @returns the currently configured ThreadFactory instance used by this object.
+         */
+        virtual ThreadFactory* getThreadFactory() const;
+
+        /**
+         * Gets the currently configured RejectedExecutionHandler for this Executor.
+         *
+         * @returns a pointer to the current RejectedExecutionHandler.
+         */
+        virtual RejectedExecutionHandler* getRejectedExecutionHandler() const;
+
+        /**
+         * Sets the new RejectedExecutionHandler that this executor should use to process any
+         * rejected Runnable tasks.  This executor takes ownership of the supplied pointer and
+         * will desotroy it upon termination, any previous handler is destroyed by this call.
+         *
+         * @param handler
+         *      The new RejectedExecutionHandler instance to use.
+         *
+         * @throws NullPointerException if the handler is NULL.
+         */
+        virtual void setRejectedExecutionHandler(RejectedExecutionHandler* handler);
+
+        /**
+         * By default a Core thread is only created once the first task is queued, this method
+         * forces the creation of core thread that waits in an idle mode for new work to be
+         * enqueued.  If the limit on core threads has already been reached then this method
+         * returns false.
+         *
+         * @return true if a new core thread was added, false otherwise.
+         */
+        virtual bool prestartCoreThread();
+
+        /**
+         * This method will create and start new core threads running in an idle state waiting for
+         * new tasks up to the set core thread limit.  When the limit is reached this method returns
+         * zero to indicate no more core threads can be created.
+         *
+         * @returns the number of core threads created, or zero if the limit has already been met.
+         */
+        virtual int prestartAllCoreThreads();
+
+        /**
+         * Attempts to remove the Runnable from the work queue, if successful then the caller
+         * now owns the Runnable and is responsible for deleting it.
+         *
+         * @param task
+         *      The task that is to be removed from the work queue.
+         *
+         * @returns true if the task was removed from the Queue.
+         */
+        bool remove(decaf::lang::Runnable* task);
+
+        /**
+         * Attempts to remove any Future derived tasks from the pending work queue if they have
+         * been canceled.  This method can be used to more quickly remove and reclaim space as
+         * canceled tasks are not run but must await a worker thread to be removed normally.
+         * Since there are multiple threads in operation its possible for this method to not
+         * remove all canceled tasks from the work queue.
+         */
+        virtual void purge();
 
     protected:
 
         /**
+         * Method called before a task is executed by the given thread.  The default implementation
+         * of this method does nothing, however a subclass can override this method to add some
+         * new functionality.
+         *
+         * It is recommended that a subclass call this method on its base class to ensure that
+         * all base classes have a chance to process this event.
+         *
+         * @param thread
+         *      The thread that will be executing the given task.
+         * @param task
+         *      The task that will be executed by the given thread.
+         */
+        virtual void beforeExecute(decaf::lang::Thread* thread, decaf::lang::Runnable* task);
+
+        /**
+         * Called upon completion of execution of a given task.  This method is called
+         * from the Thread that executed the given Runnable.  If the Throwable pointer is
+         * not NULL then its value is the Exception that caused the task to terminate.
+         *
+         * The base class implementation does nothing, a derived class should call this
+         * method on its base class to ensure that all subclasses have a chance to process
+         * the afterExecute event.
+         *
+         * @param task
+         *      The Runnable instance that was executed by the calling Thread.
+         * @param error
+         *      The exception that was thrown from the given Runnable.
+         */
+        virtual void afterExecute(decaf::lang::Runnable* task, decaf::lang::Throwable* error);
+
+        /**
          * Method invoked when the Executor has terminated, by default this method does
          * nothing.  When overridden the subclass should call superclass::terminated to
          * ensure that all subclasses have their terminated method invoked.
          */
         virtual void terminated();
 
+    public:  // RejectedExecutionHandler implementations.
+
+        /**
+         * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute
+         * this class always throws a RejectedExecutionException.
+         *
+         * @since 1.0
+         */
+        class AbortPolicy : public RejectedExecutionHandler {
+        public:
+
+            AbortPolicy() : RejectedExecutionHandler() {
+            }
+
+            virtual ~AbortPolicy() {
+            }
+
+            virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
+                delete task;
+                throw RejectedExecutionException(__FILE__, __LINE__, "Unable to execute task.");
+            }
+
+        };
+
+        /**
+         * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute
+         * this class will attempt to run the task in the Thread that called the execute method
+         * unless the executor is shutdown in which case the task is not run and is destroyed..
+         *
+         * @since 1.0
+         */
+        class CallerRunsPolicy : public RejectedExecutionHandler {
+        public:
+
+            CallerRunsPolicy() : RejectedExecutionHandler() {
+            }
+
+            virtual ~CallerRunsPolicy() {
+            }
+
+            virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
+
+                if (executer->isShutdown()) {
+                    delete task;
+                    return;
+                }
+
+                try{
+                    task->run();
+                } catch(decaf::lang::Exception& ex) {
+                    delete task;
+                    throw ex;
+                }
+            }
+
+            /**
+             * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute
+             * this class always destroys the rejected task and returns quietly.
+             *
+             * @since 1.0
+             */
+            class DiscardPolicy : public RejectedExecutionHandler {
+            public:
+
+                DiscardPolicy() : RejectedExecutionHandler() {
+                }
+
+                virtual ~DiscardPolicy() {
+                }
+
+                virtual void rejectedExecution(decaf::lang::Runnable* task, ThreadPoolExecutor* executer DECAF_UNUSED) {
+                    delete task;
+                }
+
+            };
+
+            /**
+             * Handler policy for tasks that are rejected upon a call to ThreadPoolExecutor::execute
+             * this class always destroys the oldest unexecuted task in the Queue and then attempts
+             * to execute the rejected task using the passed in executor..
+             *
+             * @since 1.0
+             */
+            class DiscardOldestPolicy : public RejectedExecutionHandler {
+            public:
+
+                DiscardOldestPolicy() : RejectedExecutionHandler() {
+                }
+
+                virtual ~DiscardOldestPolicy() {
+                }
+
+                virtual void rejectedExecution( decaf::lang::Runnable* task, ThreadPoolExecutor* executer ) {
+
+                    if (executer->isShutdown()) {
+                        delete task;
+                        return;
+                    }
+
+                    try{
+
+                        decaf::lang::Runnable* oldest = NULL;
+                        executer->getQueue()->poll(oldest);
+                        delete oldest;
+
+                        executer->execute(task);
+                    } catch(decaf::lang::Exception& ex) {
+                        delete task;
+                        throw ex;
+                    }
+                }
+
+            };
+
+        };
     };
 
 }}}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AbstractOwnableSynchronizer.h"
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::locks;
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractOwnableSynchronizer::AbstractOwnableSynchronizer() : ownerThread() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractOwnableSynchronizer::~AbstractOwnableSynchronizer() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Thread* AbstractOwnableSynchronizer::getExclusiveOwnerThread() const {
+    return this->ownerThread;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractOwnableSynchronizer::setExclusiveOwnerThread(decaf::lang::Thread* thread) {
+    this->ownerThread = thread;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_
+#define _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_
+
+#include <decaf/util/Config.h>
+
+namespace decaf {
+namespace lang {
+    class Thread;
+}
+namespace util {
+namespace concurrent {
+namespace locks {
+
+    /**
+     * Base class for locks that provide the notion of Ownership, the types of locks
+     * that are implemented using this base class would be owned by one specific Thread
+     * at any given time.
+     *
+     * @since 1.0
+     */
+    class DECAF_API AbstractOwnableSynchronizer {
+    private:
+
+        decaf::lang::Thread* ownerThread;
+
+    public:
+
+        virtual ~AbstractOwnableSynchronizer();
+
+    protected:
+
+        AbstractOwnableSynchronizer();
+
+        /**
+         * Gets the Thread that was last set using the setExclusiveOwnerThread method, or NULL
+         * if no Thread has been made the exclusive owner.
+         *
+         * @return pointer to the owner Thread or NULL if not set.
+         */
+        decaf::lang::Thread* getExclusiveOwnerThread() const;
+
+        /**
+         * Sets the Thread that has exclusive ownership of this Synchronizer, can be NULL
+         * to indicate that no Thread now owns this Synchronizer.
+         *
+         * @param thread
+         *      The Thread that now has ownership, or NULL if ownership is released.
+         */
+        void setExclusiveOwnerThread(decaf::lang::Thread* thread);
+
+    };
+
+}}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_ABSTRACTOWNABLESYNCHRONIZER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Mon Apr 11 20:22:39 2011
@@ -207,6 +207,7 @@ cc_sources = \
     decaf/util/concurrent/CopyOnWriteArrayListTest.cpp \
     decaf/util/concurrent/CopyOnWriteArraySetTest.cpp \
     decaf/util/concurrent/CountDownLatchTest.cpp \
+    decaf/util/concurrent/ExecutorsTest.cpp \
     decaf/util/concurrent/LinkedBlockingQueueTest.cpp \
     decaf/util/concurrent/MutexTest.cpp \
     decaf/util/concurrent/SynchronousQueueTest.cpp \
@@ -429,6 +430,7 @@ h_sources = \
     decaf/util/concurrent/CopyOnWriteArrayListTest.h \
     decaf/util/concurrent/CopyOnWriteArraySetTest.h \
     decaf/util/concurrent/CountDownLatchTest.h \
+    decaf/util/concurrent/ExecutorsTest.h \
     decaf/util/concurrent/LinkedBlockingQueueTest.h \
     decaf/util/concurrent/MutexTest.h \
     decaf/util/concurrent/SynchronousQueueTest.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExecutorsTest.h"
+
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/Executors.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class DefaultThreadFactoryRunnable : public Runnable {
+    private:
+
+        CountDownLatch* shutdown;
+
+    public:
+
+        DefaultThreadFactoryRunnable(CountDownLatch* shutdown) : Runnable(), shutdown(shutdown) {
+        }
+
+        virtual ~DefaultThreadFactoryRunnable() {}
+
+        virtual void run() {
+            this->shutdown->await();
+        }
+
+        void signalDone() {
+            this->shutdown->countDown();
+        }
+    };
+
+    class NoOpRunnable : public Runnable {
+    public:
+
+        NoOpRunnable() : Runnable() {
+        }
+
+        virtual ~NoOpRunnable() {}
+
+        virtual void run() {
+        }
+    };
+
+    class SimpleThreadFactory : public ThreadFactory{
+    public:
+
+        virtual Thread* newThread(Runnable* task) {
+            return new Thread(task);
+        }
+    };
+
+    void joinPool(Pointer<ExecutorService>& exec) {
+        try {
+            exec->shutdown();
+            CPPUNIT_ASSERT(exec->awaitTermination(5000, TimeUnit::MILLISECONDS));
+        } catch(InterruptedException& ie) {
+            CPPUNIT_FAIL("Unexpected exception");
+        }
+    }
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorsTest::ExecutorsTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorsTest::~ExecutorsTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorsTest::testDefaultThreadFactory() {
+
+    CountDownLatch shutdown(1);
+    Pointer<ThreadFactory> defaultFactory;
+    DefaultThreadFactoryRunnable* runner = new DefaultThreadFactoryRunnable(&shutdown);
+
+    defaultFactory.reset(Executors::getDefaultThreadFactory());
+
+    Thread* theThread = defaultFactory->newThread(runner);
+
+    CPPUNIT_ASSERT(theThread != NULL);
+    CPPUNIT_ASSERT_EQUAL(false, theThread->isDaemon());
+    const int expected = Thread::NORM_PRIORITY;
+    CPPUNIT_ASSERT_EQUAL(expected, theThread->getPriority());
+
+    theThread->start();
+
+    shutdown.countDown();
+    theThread->join();
+
+    delete theThread;
+    delete runner;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorsTest::testNewFixedThreadPool1() {
+    Pointer<ExecutorService> e(Executors::newFixedThreadPool(2));
+
+    e->execute(new NoOpRunnable());
+    e->execute(new NoOpRunnable());
+    e->execute(new NoOpRunnable());
+
+    joinPool(e);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorsTest::testNewFixedThreadPool2() {
+
+    Pointer<ExecutorService> e(Executors::newFixedThreadPool(2, new SimpleThreadFactory()));
+
+    e->execute(new NoOpRunnable());
+    e->execute(new NoOpRunnable());
+    e->execute(new NoOpRunnable());
+
+    joinPool(e);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorsTest::testNewFixedThreadPool3() {
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should throw a NullPointerException",
+        Executors::newFixedThreadPool(2, NULL),
+        NullPointerException);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorsTest::testNewFixedThreadPool4() {
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should throw a IllegalArgumentException",
+        Executors::newFixedThreadPool(0),
+        IllegalArgumentException);
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_
+#define _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+    class ExecutorsTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( ExecutorsTest );
+        CPPUNIT_TEST( testDefaultThreadFactory );
+        CPPUNIT_TEST( testNewFixedThreadPool1 );
+        CPPUNIT_TEST( testNewFixedThreadPool2 );
+        CPPUNIT_TEST( testNewFixedThreadPool3 );
+        CPPUNIT_TEST( testNewFixedThreadPool4 );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        ExecutorsTest();
+        virtual ~ExecutorsTest();
+
+        void testDefaultThreadFactory();
+        void testNewFixedThreadPool1();
+        void testNewFixedThreadPool2();
+        void testNewFixedThreadPool3();
+        void testNewFixedThreadPool4();
+
+    };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_EXECUTORSTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Mon Apr 11 20:22:39 2011
@@ -275,6 +275,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( decaf::
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::MutexTest );
 #include <decaf/util/concurrent/ThreadPoolExecutorTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ThreadPoolExecutorTest );
+#include <decaf/util/concurrent/ExecutorsTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ExecutorsTest );
 #include <decaf/util/concurrent/TimeUnitTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::TimeUnitTest );
 #include <decaf/util/concurrent/LinkedBlockingQueueTest.h>