You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/21 00:23:35 UTC

svn commit: r1148957 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent: Executors.cpp Executors.h Future.h

Author: tabish
Date: Wed Jul 20 22:23:34 2011
New Revision: 1148957

URL: http://svn.apache.org/viewvc?rev=1148957&view=rev
Log:
Implements some more of the builder methods inside of the Executors API and cleans up some API docs

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp Wed Jul 20 22:23:34 2011
@@ -26,6 +26,7 @@
 #include <decaf/util/concurrent/ThreadFactory.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/util/concurrent/AbstractExecutorService.h>
 
 using namespace decaf;
 using namespace decaf::util;
@@ -72,6 +73,53 @@ namespace {
         }
     };
 
+    class NonConfigurableExecutorService : public AbstractExecutorService {
+    private:
+
+        ExecutorService* service;
+
+    public:
+
+        NonConfigurableExecutorService(ExecutorService* service) : AbstractExecutorService(), service(service) {
+        }
+
+        virtual ~NonConfigurableExecutorService() {
+            try{
+                delete service;
+            }
+            DECAF_CATCHALL_NOTHROW()
+        }
+
+        virtual void execute(decaf::lang::Runnable* command) {
+            this->service->execute(command);
+        }
+
+        virtual void execute(decaf::lang::Runnable* command, bool takeOwnership) {
+            this->service->execute(command, takeOwnership);
+        }
+
+        virtual bool awaitTermination(long long timeout, const TimeUnit& unit) {
+            return this->service->awaitTermination(timeout, unit);
+        }
+
+        virtual void shutdown() {
+            this->service->shutdown();
+        }
+
+        virtual ArrayList<decaf::lang::Runnable*> shutdownNow() {
+            return this->service->shutdownNow();
+        }
+
+        virtual bool isShutdown() const {
+            return this->service->isShutdown();
+        }
+
+        virtual bool isTerminated() const {
+            return this->service->isTerminated();
+        }
+
+    };
+
     AtomicInteger* DefaultThreadFactory::poolNumber = NULL;
 }
 
@@ -155,3 +203,92 @@ ExecutorService* Executors::newFixedThre
         throw Exception();
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newSingleThreadExecutor() {
+
+    Pointer< BlockingQueue<Runnable*> > backingQ;
+
+    try{
+
+        backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+        ExecutorService* service = new ThreadPoolExecutor(
+            1, 1, 0, TimeUnit::MILLISECONDS, backingQ.get());
+
+        backingQ.release();
+
+        NonConfigurableExecutorService* result = new NonConfigurableExecutorService(service);
+
+        return result;
+
+    } catch(NullPointerException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(IllegalArgumentException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(...) {
+        throw Exception();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newSingleThreadExecutor(ThreadFactory* threadFactory) {
+
+    Pointer< BlockingQueue<Runnable*> > backingQ;
+
+    try{
+
+        backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+        ExecutorService* service = new ThreadPoolExecutor(
+            1, 1, 0, TimeUnit::MILLISECONDS, backingQ.get(), threadFactory);
+
+        backingQ.release();
+
+        NonConfigurableExecutorService* result = new NonConfigurableExecutorService(service);
+
+        return result;
+
+    } catch(NullPointerException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(IllegalArgumentException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(...) {
+        throw Exception();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::unconfigurableExecutorService(ExecutorService* executor) {
+
+    try{
+
+        if (executor == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "The wrapped service cannot be NULL");
+        }
+
+        NonConfigurableExecutorService* result = new NonConfigurableExecutorService(executor);
+
+        return result;
+
+    } catch(NullPointerException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(IllegalArgumentException& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(Exception& ex) {
+        ex.setMark(__FILE__, __LINE__);
+        throw ex;
+    } catch(...) {
+        throw Exception();
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h Wed Jul 20 22:23:34 2011
@@ -21,8 +21,12 @@
 #include <decaf/util/Config.h>
 
 #include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/util/concurrent/ThreadFactory.h>
+#include <decaf/util/concurrent/Callable.h>
+
+#include <decaf/lang/exceptions/NullPointerException.h>
 
 namespace decaf {
 namespace util {
@@ -38,6 +42,40 @@ namespace concurrent {
     class DECAF_API Executors {
     private:
 
+        /**
+         * A Callable subclass that runs given task and returns given result
+         */
+        template<typename E>
+        class RunnableAdapter : public Callable<E> {
+        private:
+
+            decaf::lang::Runnable* task;
+            bool owns;
+            E result;
+
+        public:
+
+            RunnableAdapter(decaf::lang::Runnable* task, bool owns, const E& result) :
+                Callable<E>(), task(task), owns(owns), result(result) {
+            }
+
+            virtual ~RunnableAdapter() {
+                try{
+                    if (owns) {
+                        delete this->task;
+                    }
+                }
+                DECAF_CATCHALL_NOTHROW()
+            }
+
+            virtual E call() {
+                this->task->run();
+                return result;
+            }
+        };
+
+    private:
+
         Executors();
         Executors(const Executors&);
         Executors& operator= (const Executors&);
@@ -98,6 +136,104 @@ namespace concurrent {
          */
         static ExecutorService* newFixedThreadPool(int nThreads, ThreadFactory* threadFactory);
 
+        /**
+         * Creates an Executor that uses a single worker thread operating off an unbounded queue
+         * owned by the executor.  If the Executor's single thread should terminate for some reason
+         * such as failure during the execution of a task, a new Thread will be created if the Executor
+         * has not been shutdown and there are more tasks in the queue.  The Executor returned from this
+         * method is owned by the caller but unlike the Executor returned from the method
+         * newFixedThreadPool(1) this one cannot be reconfigurable to use more threads later on.
+         *
+         * @returns a new Executor pointer that is owned by the caller.
+         */
+        static ExecutorService* newSingleThreadExecutor();
+
+        /**
+         * Creates an Executor that uses a single worker thread operating off an unbounded queue
+         * owned by the executor.  If the Executor's single thread should terminate for some reason
+         * such as failure during the execution of a task, a new Thread will be created if the Executor
+         * has not been shutdown and there are more tasks in the queue.  The Executor returned from this
+         * method is owned by the caller but unlike the Executor returned from the method
+         * newFixedThreadPool(1) this one cannot be reconfigurable to use more threads later on.
+         *
+         * @param threadFactory
+         *      Instance of a ThreadFactory that will be used by the Executor to spawn new
+         *      worker threads.  This parameter cannot be NULL and ownership passes to the Executor.
+         *
+         * @returns a new Executor pointer that is owned by the caller.
+         *
+         * @throws NullPointerException if threadFactory is NULL.
+         */
+        static ExecutorService* newSingleThreadExecutor(ThreadFactory* threadFactory);
+
+        /**
+         * Returns a new ExecutorService derived instance that wraps and takes ownership of the given
+         * ExecutorService pointer.  The returned ExecutorService delegates all calls to the wrapped
+         * ExecutorService instance but does not allow any configuration changes.  This method provides
+         * a means of locking an ExecutorService instance configuration and prevents changes that might
+         * be accomplished with casting.
+         *
+         * @param executor
+         *      The ExecutorService pointer to wrap and take ownership of.
+         *
+         * @returns a new ExecutorService pointer that is owned by the caller.
+         *
+         * @throws NullPointerException if ExecutorService is NULL.
+         */
+        static ExecutorService* unconfigurableExecutorService(ExecutorService* executor);
+
+    public:
+
+        /**
+         * Returns a Callable object that, when called, runs the given task and returns the default
+         * value of the template type E (or E()).
+         *
+         * @param task
+         *      The Runnable task that is to be executed.
+         * @param owns
+         *      Does the callable instance own the given Runnable task pointer, default is true.
+         *
+         * @returns a new Callable<E> pointer that is owned by the caller.
+         *
+         * @throws NullPointerException if the Runnable task is NULL
+         */
+        template<typename E>
+        static Callable<E>* callable(decaf::lang::Runnable* task, bool owns = true) {
+
+            if (task == NULL) {
+                throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
+                    "The Runnable task argument cannot be NULL");
+            }
+
+            return new RunnableAdapter<E>(task, owns, E());
+        }
+
+        /**
+         * Returns a Callable object that, when called, runs the given task and returns the default
+         * value of the template type E (or E()).
+         *
+         * @param task
+         *      The Runnable task that is to be executed.
+         * @param result
+         *      The value that is returned from the callable upon completion.
+         * @param owns
+         *      Does the callable instance own the given Runnable task pointer, default is true.
+         *
+         * @returns a new Callable<E> pointer that is owned by the caller.
+         *
+         * @throws NullPointerException if the Runnable task is NULL
+         */
+        template<typename E>
+        static Callable<E>* callable(decaf::lang::Runnable* task, const E& result, bool owns = true) {
+
+            if (task == NULL) {
+                throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
+                    "The Runnable task argument cannot be NULL");
+            }
+
+            return new RunnableAdapter<E>(task, owns, result);
+        }
+
     private:
 
         static void initialize();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h Wed Jul 20 22:23:34 2011
@@ -51,7 +51,7 @@ namespace concurrent {
          * @returns false if the task could not be canceled, typically because it has
          *          already completed normally; true otherwise
          */
-        virtual bool cancel( bool mayInterruptIfRunning ) = 0;
+        virtual bool cancel(bool mayInterruptIfRunning) = 0;
 
         /**
          * Returns true if this task was canceled before it completed normally.
@@ -92,25 +92,32 @@ namespace concurrent {
 
         /**
          * Waits if necessary for the computation to complete, and then retrieves its result.
+         *
          * @returns the computed result.
-         * @throws CancellationException - if the computation was canceled
-         * @throws ExecutionException - if the computation threw an exception
-         * @throws InterruptedException - if the current thread was interrupted while waiting
+         *
+         * @throws CancellationException if the computation was canceled
+         * @throws ExecutionException if the computation threw an exception
+         * @throws InterruptedException if the current thread was interrupted while waiting
          */
         virtual V get() = 0;
 
         /**
          * Waits if necessary for at most the given time for the computation to complete, and
          * then retrieves its result, if available.
-         * @param timeout - the maximum time to wait
-         * @param unit - the time unit of the timeout argument
+         *
+         * @param timeout
+         *      The maximum time to wait for this Future to finish.
+         * @param unit
+         *      The time unit of the timeout argument.
+         *
          * @returns the computed result
-         * @throws CancellationException - if the computation was canceled
-         * @throws ExecutionException - if the computation threw an exception
-         * @throws InterruptedException - if the current thread was interrupted while waiting
-         * @throws TimeoutException - if the wait timed out
+         *
+         * @throws CancellationException if the computation was canceled
+         * @throws ExecutionException if the computation threw an exception
+         * @throws InterruptedException if the current thread was interrupted while waiting
+         * @throws TimeoutException if the wait timed out before the future completed.
          */
-        virtual V get( long long timeout, const TimeUnit& unit ) = 0;
+        virtual V get(long long timeout, const TimeUnit& unit) = 0;
 
     };