You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/07/21 00:23:35 UTC
svn commit: r1148957 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent:
Executors.cpp Executors.h Future.h
Author: tabish
Date: Wed Jul 20 22:23:34 2011
New Revision: 1148957
URL: http://svn.apache.org/viewvc?rev=1148957&view=rev
Log:
Implements some more of the builder methods inside of the Executors API and cleans up some API docs
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp Wed Jul 20 22:23:34 2011
@@ -26,6 +26,7 @@
#include <decaf/util/concurrent/ThreadFactory.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/util/concurrent/AbstractExecutorService.h>
using namespace decaf;
using namespace decaf::util;
@@ -72,6 +73,53 @@ namespace {
}
};
+ class NonConfigurableExecutorService : public AbstractExecutorService {
+ private:
+
+ ExecutorService* service;
+
+ public:
+
+ NonConfigurableExecutorService(ExecutorService* service) : AbstractExecutorService(), service(service) {
+ }
+
+ virtual ~NonConfigurableExecutorService() {
+ try{
+ delete service;
+ }
+ DECAF_CATCHALL_NOTHROW()
+ }
+
+ virtual void execute(decaf::lang::Runnable* command) {
+ this->service->execute(command);
+ }
+
+ virtual void execute(decaf::lang::Runnable* command, bool takeOwnership) {
+ this->service->execute(command, takeOwnership);
+ }
+
+ virtual bool awaitTermination(long long timeout, const TimeUnit& unit) {
+ return this->service->awaitTermination(timeout, unit);
+ }
+
+ virtual void shutdown() {
+ this->service->shutdown();
+ }
+
+ virtual ArrayList<decaf::lang::Runnable*> shutdownNow() {
+ return this->service->shutdownNow();
+ }
+
+ virtual bool isShutdown() const {
+ return this->service->isShutdown();
+ }
+
+ virtual bool isTerminated() const {
+ return this->service->isTerminated();
+ }
+
+ };
+
AtomicInteger* DefaultThreadFactory::poolNumber = NULL;
}
@@ -155,3 +203,92 @@ ExecutorService* Executors::newFixedThre
throw Exception();
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newSingleThreadExecutor() {
+
+ Pointer< BlockingQueue<Runnable*> > backingQ;
+
+ try{
+
+ backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+ ExecutorService* service = new ThreadPoolExecutor(
+ 1, 1, 0, TimeUnit::MILLISECONDS, backingQ.get());
+
+ backingQ.release();
+
+ NonConfigurableExecutorService* result = new NonConfigurableExecutorService(service);
+
+ return result;
+
+ } catch(NullPointerException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(IllegalArgumentException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(...) {
+ throw Exception();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newSingleThreadExecutor(ThreadFactory* threadFactory) {
+
+ Pointer< BlockingQueue<Runnable*> > backingQ;
+
+ try{
+
+ backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+ ExecutorService* service = new ThreadPoolExecutor(
+ 1, 1, 0, TimeUnit::MILLISECONDS, backingQ.get(), threadFactory);
+
+ backingQ.release();
+
+ NonConfigurableExecutorService* result = new NonConfigurableExecutorService(service);
+
+ return result;
+
+ } catch(NullPointerException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(IllegalArgumentException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(...) {
+ throw Exception();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::unconfigurableExecutorService(ExecutorService* executor) {
+
+ try{
+
+ if (executor == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The wrapped service cannot be NULL");
+ }
+
+ NonConfigurableExecutorService* result = new NonConfigurableExecutorService(executor);
+
+ return result;
+
+ } catch(NullPointerException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(IllegalArgumentException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(...) {
+ throw Exception();
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h Wed Jul 20 22:23:34 2011
@@ -21,8 +21,12 @@
#include <decaf/util/Config.h>
#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/util/concurrent/ThreadFactory.h>
+#include <decaf/util/concurrent/Callable.h>
+
+#include <decaf/lang/exceptions/NullPointerException.h>
namespace decaf {
namespace util {
@@ -38,6 +42,40 @@ namespace concurrent {
class DECAF_API Executors {
private:
+ /**
+ * A Callable subclass that runs given task and returns given result
+ */
+ template<typename E>
+ class RunnableAdapter : public Callable<E> {
+ private:
+
+ decaf::lang::Runnable* task;
+ bool owns;
+ E result;
+
+ public:
+
+ RunnableAdapter(decaf::lang::Runnable* task, bool owns, const E& result) :
+ Callable<E>(), task(task), owns(owns), result(result) {
+ }
+
+ virtual ~RunnableAdapter() {
+ try{
+ if (owns) {
+ delete this->task;
+ }
+ }
+ DECAF_CATCHALL_NOTHROW()
+ }
+
+ virtual E call() {
+ this->task->run();
+ return result;
+ }
+ };
+
+ private:
+
Executors();
Executors(const Executors&);
Executors& operator= (const Executors&);
@@ -98,6 +136,104 @@ namespace concurrent {
*/
static ExecutorService* newFixedThreadPool(int nThreads, ThreadFactory* threadFactory);
+ /**
+ * Creates an Executor that uses a single worker thread operating off an unbounded queue
+ * owned by the executor. If the Executor's single thread should terminate for some reason
+ * such as failure during the execution of a task, a new Thread will be created if the Executor
+ * has not been shutdown and there are more tasks in the queue. The Executor returned from this
+ * method is owned by the caller but unlike the Executor returned from the method
+ * newFixedThreadPool(1) this one cannot be reconfigurable to use more threads later on.
+ *
+ * @returns a new Executor pointer that is owned by the caller.
+ */
+ static ExecutorService* newSingleThreadExecutor();
+
+ /**
+ * Creates an Executor that uses a single worker thread operating off an unbounded queue
+ * owned by the executor. If the Executor's single thread should terminate for some reason
+ * such as failure during the execution of a task, a new Thread will be created if the Executor
+ * has not been shutdown and there are more tasks in the queue. The Executor returned from this
+ * method is owned by the caller but unlike the Executor returned from the method
+ * newFixedThreadPool(1) this one cannot be reconfigurable to use more threads later on.
+ *
+ * @param threadFactory
+ * Instance of a ThreadFactory that will be used by the Executor to spawn new
+ * worker threads. This parameter cannot be NULL and ownership passes to the Executor.
+ *
+ * @returns a new Executor pointer that is owned by the caller.
+ *
+ * @throws NullPointerException if threadFactory is NULL.
+ */
+ static ExecutorService* newSingleThreadExecutor(ThreadFactory* threadFactory);
+
+ /**
+ * Returns a new ExecutorService derived instance that wraps and takes ownership of the given
+ * ExecutorService pointer. The returned ExecutorService delegates all calls to the wrapped
+ * ExecutorService instance but does not allow any configuration changes. This method provides
+ * a means of locking an ExecutorService instance configuration and prevents changes that might
+ * be accomplished with casting.
+ *
+ * @param executor
+ * The ExecutorService pointer to wrap and take ownership of.
+ *
+ * @returns a new ExecutorService pointer that is owned by the caller.
+ *
+ * @throws NullPointerException if ExecutorService is NULL.
+ */
+ static ExecutorService* unconfigurableExecutorService(ExecutorService* executor);
+
+ public:
+
+ /**
+ * Returns a Callable object that, when called, runs the given task and returns the default
+ * value of the template type E (or E()).
+ *
+ * @param task
+ * The Runnable task that is to be executed.
+ * @param owns
+ * Does the callable instance own the given Runnable task pointer, default is true.
+ *
+ * @returns a new Callable<E> pointer that is owned by the caller.
+ *
+ * @throws NullPointerException if the Runnable task is NULL
+ */
+ template<typename E>
+ static Callable<E>* callable(decaf::lang::Runnable* task, bool owns = true) {
+
+ if (task == NULL) {
+ throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
+ "The Runnable task argument cannot be NULL");
+ }
+
+ return new RunnableAdapter<E>(task, owns, E());
+ }
+
+ /**
+ * Returns a Callable object that, when called, runs the given task and returns the default
+ * value of the template type E (or E()).
+ *
+ * @param task
+ * The Runnable task that is to be executed.
+ * @param result
+ * The value that is returned from the callable upon completion.
+ * @param owns
+ * Does the callable instance own the given Runnable task pointer, default is true.
+ *
+ * @returns a new Callable<E> pointer that is owned by the caller.
+ *
+ * @throws NullPointerException if the Runnable task is NULL
+ */
+ template<typename E>
+ static Callable<E>* callable(decaf::lang::Runnable* task, const E& result, bool owns = true) {
+
+ if (task == NULL) {
+ throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__,
+ "The Runnable task argument cannot be NULL");
+ }
+
+ return new RunnableAdapter<E>(task, owns, result);
+ }
+
private:
static void initialize();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h?rev=1148957&r1=1148956&r2=1148957&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Future.h Wed Jul 20 22:23:34 2011
@@ -51,7 +51,7 @@ namespace concurrent {
* @returns false if the task could not be canceled, typically because it has
* already completed normally; true otherwise
*/
- virtual bool cancel( bool mayInterruptIfRunning ) = 0;
+ virtual bool cancel(bool mayInterruptIfRunning) = 0;
/**
* Returns true if this task was canceled before it completed normally.
@@ -92,25 +92,32 @@ namespace concurrent {
/**
* Waits if necessary for the computation to complete, and then retrieves its result.
+ *
* @returns the computed result.
- * @throws CancellationException - if the computation was canceled
- * @throws ExecutionException - if the computation threw an exception
- * @throws InterruptedException - if the current thread was interrupted while waiting
+ *
+ * @throws CancellationException if the computation was canceled
+ * @throws ExecutionException if the computation threw an exception
+ * @throws InterruptedException if the current thread was interrupted while waiting
*/
virtual V get() = 0;
/**
* Waits if necessary for at most the given time for the computation to complete, and
* then retrieves its result, if available.
- * @param timeout - the maximum time to wait
- * @param unit - the time unit of the timeout argument
+ *
+ * @param timeout
+ * The maximum time to wait for this Future to finish.
+ * @param unit
+ * The time unit of the timeout argument.
+ *
* @returns the computed result
- * @throws CancellationException - if the computation was canceled
- * @throws ExecutionException - if the computation threw an exception
- * @throws InterruptedException - if the current thread was interrupted while waiting
- * @throws TimeoutException - if the wait timed out
+ *
+ * @throws CancellationException if the computation was canceled
+ * @throws ExecutionException if the computation threw an exception
+ * @throws InterruptedException if the current thread was interrupted while waiting
+ * @throws TimeoutException if the wait timed out before the future completed.
*/
- virtual V get( long long timeout, const TimeUnit& unit ) = 0;
+ virtual V get(long long timeout, const TimeUnit& unit) = 0;
};