You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/11 22:22:40 UTC
svn commit: r1091195 [1/2] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/decaf/lang/
main/decaf/util/ main/decaf/util/concurrent/
main/decaf/util/concurrent/locks/ test/ test/decaf/util/concurrent/
Author: tabish
Date: Mon Apr 11 20:22:39 2011
New Revision: 1091195
URL: http://svn.apache.org/viewvc?rev=1091195&view=rev
Log:
Flush out some more of the Threading API and test what's working now. Cleans up some other API code for the next release.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ExecutorsTest.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Apr 11 20:22:39 2011
@@ -432,6 +432,7 @@ cc_sources = \
decaf/util/Timer.cpp \
decaf/util/TimerTask.cpp \
decaf/util/UUID.cpp \
+ decaf/util/concurrent/AbstractExecutorService.cpp \
decaf/util/concurrent/BlockingQueue.cpp \
decaf/util/concurrent/BrokenBarrierException.cpp \
decaf/util/concurrent/Callable.cpp \
@@ -443,10 +444,12 @@ cc_sources = \
decaf/util/concurrent/Delayed.cpp \
decaf/util/concurrent/Executor.cpp \
decaf/util/concurrent/ExecutorService.cpp \
+ decaf/util/concurrent/Executors.cpp \
decaf/util/concurrent/Future.cpp \
decaf/util/concurrent/LinkedBlockingQueue.cpp \
decaf/util/concurrent/Lock.cpp \
decaf/util/concurrent/Mutex.cpp \
+ decaf/util/concurrent/RejectedExecutionHandler.cpp \
decaf/util/concurrent/Semaphore.cpp \
decaf/util/concurrent/SynchronousQueue.cpp \
decaf/util/concurrent/ThreadFactory.cpp \
@@ -456,6 +459,7 @@ cc_sources = \
decaf/util/concurrent/atomic/AtomicInteger.cpp \
decaf/util/concurrent/atomic/AtomicRefCounter.cpp \
decaf/util/concurrent/atomic/AtomicReference.cpp \
+ decaf/util/concurrent/locks/AbstractOwnableSynchronizer.cpp \
decaf/util/concurrent/locks/LockSupport.cpp \
decaf/util/concurrent/locks/ReentrantLock.cpp \
decaf/util/logging/ConsoleHandler.cpp \
@@ -1019,6 +1023,7 @@ h_sources = \
decaf/util/TimerTask.h \
decaf/util/UUID.h \
decaf/util/comparators/Less.h \
+ decaf/util/concurrent/AbstractExecutorService.h \
decaf/util/concurrent/BlockingQueue.h \
decaf/util/concurrent/BrokenBarrierException.h \
decaf/util/concurrent/Callable.h \
@@ -1033,6 +1038,7 @@ h_sources = \
decaf/util/concurrent/ExecutionException.h \
decaf/util/concurrent/Executor.h \
decaf/util/concurrent/ExecutorService.h \
+ decaf/util/concurrent/Executors.h \
decaf/util/concurrent/Future.h \
decaf/util/concurrent/LinkedBlockingQueue.h \
decaf/util/concurrent/Lock.h \
@@ -1050,6 +1056,7 @@ h_sources = \
decaf/util/concurrent/atomic/AtomicInteger.h \
decaf/util/concurrent/atomic/AtomicRefCounter.h \
decaf/util/concurrent/atomic/AtomicReference.h \
+ decaf/util/concurrent/locks/AbstractOwnableSynchronizer.h \
decaf/util/concurrent/locks/Condition.h \
decaf/util/concurrent/locks/Lock.h \
decaf/util/concurrent/locks/LockSupport.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.cpp Mon Apr 11 20:22:39 2011
@@ -20,6 +20,12 @@
#include <decaf/lang/ArrayPointer.h>
#include <decaf/lang/System.h>
#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IndexOutOfBoundsException.h>
+#include <decaf/lang/Short.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
+#include <decaf/lang/Float.h>
+#include <decaf/lang/Double.h>
using namespace std;
using namespace decaf;
@@ -69,6 +75,60 @@ String::String( const std::string& sourc
}
////////////////////////////////////////////////////////////////////////////////
+String::String(const char* array, int size) : contents(new Contents) {
+
+ if( size < 0 ) {
+ throw IndexOutOfBoundsException(
+ __FILE__, __LINE__, "size parameter out of Bounds: %d.", size );
+ }
+
+ if( array == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Buffer pointer passed was NULL." );
+ }
+
+ if(size > 0) {
+
+ this->contents->value = ArrayPointer<unsigned char>(size);
+ this->contents->length = size;
+
+ System::arraycopy( (unsigned char*)array, 0, contents->value.get(), 0, size );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String::String(const char* array, int size, int offset, int length) : contents(new Contents) {
+
+ if( size < 0 ) {
+ throw IndexOutOfBoundsException(
+ __FILE__, __LINE__, "size parameter out of Bounds: %d.", size );
+ }
+
+ if( offset > size || offset < 0 ) {
+ throw IndexOutOfBoundsException(
+ __FILE__, __LINE__, "offset parameter out of Bounds: %d.", offset );
+ }
+
+ if( length < 0 || length > size - offset ) {
+ throw IndexOutOfBoundsException(
+ __FILE__, __LINE__, "length parameter out of Bounds: %d.", length );
+ }
+
+ if( array == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Buffer pointer passed was NULL." );
+ }
+
+ if(size > 0) {
+
+ this->contents->value = ArrayPointer<unsigned char>(length);
+ this->contents->length = length;
+
+ System::arraycopy( (unsigned char*)array, offset, contents->value.get(), 0, length );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
String::~String() {
try{
delete this->contents;
@@ -133,3 +193,43 @@ std::string String::toString() const {
return std::string( (const char*)this->contents->value.get(), this->length() );
}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(bool value) {
+
+ if(value) {
+ return String("true");
+ }
+
+ return String("false");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(char value) {
+ return String( &value, 1 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(float value) {
+ return String( Float::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(double value) {
+ return String( Double::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(short value) {
+ return String( Short::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(int value) {
+ return String( Integer::toString(value) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+String String::valueOf(long long value) {
+ return String( Long::toString(value) );
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/String.h Mon Apr 11 20:22:39 2011
@@ -63,6 +63,43 @@ namespace lang {
*/
String( const std::string& source );
+ /**
+ * Create a new String object that represents the given array of characters. The method
+ * takes the size of the array as a parameter to allow for strings that are not NULL
+ * terminated, the caller can pass strlen(array) in the case where the array is properly
+ * NULL terminated.
+ *
+ * @param array
+ * The character buffer to copy into this new String object.
+ * @param size
+ * The size of the string buffer given, in case the string is not NULL terminated.
+ *
+ * @throws NullPointerException if the character array parameter is NULL.
+ * @throws IndexOutOfBoundsException if the size parameter is negative.
+ */
+ String( const char* array, int size );
+
+ /**
+ * Create a new String object that represents the given array of characters. The method
+ * takes the size of the array as a parameter to allow for strings that are not NULL
+ * terminated, the caller can pass strlen(array) in the case where the array is properly
+ * NULL terminated.
+ *
+ * @param array
+ * The character buffer to copy into this new String object.
+ * @param size
+ * The size of the string buffer given, in case the string is not NULL terminated.
+ * @param offset
+ * The position to start copying from in the given buffer.
+ * @param length
+ * The number of bytes to copy from the given buffer starting from the offset.
+ *
+ * @throws NullPointerException if the character array parameter is NULL.
+ * @throws IndexOutOfBoundsException if the size, offset or length parameter is negative
+ * or if the length to copy is greater than the span of size - offset.
+ */
+ String( const char* array, int size, int offset, int length );
+
virtual ~String();
public:
@@ -100,6 +137,78 @@ namespace lang {
*/
virtual std::string toString() const;
+ public: // Static methods.
+
+ /**
+ * Returns a String that represents the value of the given boolean value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns "true" if the boolean is true, "false" otherwise.
+ */
+ static String valueOf(bool value);
+
+ /**
+ * Returns a String that represents the value of the given char value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the single character value given.
+ */
+ static String valueOf(char value);
+
+ /**
+ * Returns a String that represents the value of the given float value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the string representation of the float value given.
+ */
+ static String valueOf(float value);
+
+ /**
+ * Returns a String that represents the value of the given double value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the string representation of the double value given.
+ */
+ static String valueOf(double value);
+
+ /**
+ * Returns a String that represents the value of the given short value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the string representation of the short value given.
+ */
+ static String valueOf(short value);
+
+ /**
+ * Returns a String that represents the value of the given integer value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the string representation of the integer value given.
+ */
+ static String valueOf(int value);
+
+ /**
+ * Returns a String that represents the value of the given 64bit long value.
+ *
+ * @param value
+ * The value whose string representation is to be returned.
+ *
+ * @returns a String that contains the string representation of the 64 bit long value given.
+ */
+ static String valueOf(long long value);
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp Mon Apr 11 20:22:39 2011
@@ -28,8 +28,10 @@
#include <decaf/lang/Exception.h>
#include <decaf/lang/exceptions/RuntimeException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalThreadStateException.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Executors.h>
#include <vector>
@@ -231,11 +233,17 @@ void Thread::initThreading() {
// We mark the thread where Decaf's Init routine is called from as our Main Thread.
mainThread = Thread::createForeignThreadInstance( "Main Thread" );
+
+ // Initialize the Executors static data for use in ExecutorService classes.
+ Executors::initialize();
}
////////////////////////////////////////////////////////////////////////////////
void Thread::shutdownThreading() {
+ // First shutdown the Executors static data to remove dependencies on Threading.
+ Executors::shutdown();
+
// Clear the Main Thread instance pointer, this indicates we are Shutdown.
mainThread = NULL;
@@ -579,6 +587,21 @@ int Thread::getPriority() const {
}
////////////////////////////////////////////////////////////////////////////////
+bool Thread::isDaemon() const {
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::setDaemon(bool value DECAF_UNUSED) {
+
+ if(this->properties->state > Thread::NEW) {
+ throw IllegalThreadStateException(__FILE__, __LINE__, "Thread is already active.");
+ }
+
+ // TODO - Set thread to detached or joinable as indicated by the value arg.
+}
+
+////////////////////////////////////////////////////////////////////////////////
void Thread::setUncaughtExceptionHandler( UncaughtExceptionHandler* handler ) {
this->properties->exHandler = handler;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.h Mon Apr 11 20:22:39 2011
@@ -263,6 +263,25 @@ namespace lang{
void setPriority( int value );
/**
+ * Sets if the given Thread is a Daemon Thread or not. Daemon threads cannot be
+ * joined and its resource are automatically reclaimed when it terminates.
+ *
+ * @param value
+ * Boolean indicating if this thread should be a daemon thread or not.
+ *
+ * @throws IllegalThreadStateException if the thread is already active.
+ */
+ void setDaemon(bool value);
+
+ /**
+ * Returns whether this thread is a daemon thread or not, if true this thread cannot
+ * be joined.
+ *
+ * @return true if the thread is a daemon thread.
+ */
+ bool isDaemon() const;
+
+ /**
* Set the handler invoked when this thread abruptly terminates due to an uncaught exception.
*
* @returns a pointer to the set UncaughtExceptionHandler.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/AbstractQueue.h Mon Apr 11 20:22:39 2011
@@ -85,6 +85,8 @@ namespace util {
return AbstractCollection<E>::addAll( collection );
}
+ using AbstractCollection<E>::remove;
+
/**
* {@inheritDoc}
*
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AbstractExecutorService.h"
+
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractExecutorService::AbstractExecutorService() : ExecutorService() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractExecutorService::~AbstractExecutorService() {
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_
+#define _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/util/concurrent/Executor.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+ /**
+ * Provides a default implementation for the methods of the ExecutorService
+ * interface. Use this class as a starting point for implementations of custom
+ * executor service implementations.
+ *
+ * @since 1.0
+ */
+ class DECAF_API AbstractExecutorService : public ExecutorService {
+ public:
+
+ AbstractExecutorService();
+ virtual ~AbstractExecutorService();
+
+ };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_ABSTRACTEXECUTORSERVICE_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/BlockingQueue.h Mon Apr 11 20:22:39 2011
@@ -167,6 +167,9 @@ namespace concurrent {
virtual ~BlockingQueue() {
}
+ using Queue<E>::offer;
+ using Queue<E>::poll;
+
/**
* Inserts the specified element into this queue, waiting if necessary for space
* to become available.
@@ -179,8 +182,6 @@ namespace concurrent {
*/
virtual void put( const E& value ) = 0;
- using Queue<E>::offer;
-
/**
* Inserts the specified element into this queue, waiting up to the specified wait
* time if necessary for space to become available.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h Mon Apr 11 20:22:39 2011
@@ -20,6 +20,8 @@
#include <decaf/util/Config.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/Executor.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/lang/exceptions/InterruptedException.h>
@@ -55,20 +57,55 @@ namespace concurrent {
virtual ~ExecutorService() {}
/**
- * Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs,
- * or the current thread is interrupted, whichever happens first.
+ * The caller will block until the executor has completed termination meaning all tasks
+ * that where scheduled before shutdown have now completed and the executor is ready for
+ * deletion. If the timeout period elapses before the executor reaches the terminated
+ * state then this method return false to indicate it has not terminated.
*
* @param timeout
- * The amount of time to wait before timing out the Wait operation.
+ * The amount of time to wait before abandoning the wait for termination.
* @param unit
- * The Units that comprise the timeout value.
+ * The unit of time that the timeout value represents.
*
- * @returns true if the executer terminated before the given timeout value elapsed.
+ * @return true if the executor terminated or false if the timeout expired.
*
- * @throws InterruptedException - if interrupted while waiting.
+ * @throws InterruptedException if this call is interrupted while awaiting termination.
*/
virtual bool awaitTermination( long long timeout, const TimeUnit& unit ) = 0;
+ /**
+ * Performs an orderly shutdown of this Executor. Previously queued tasks are allowed
+ * to complete but no new tasks are accepted for execution. Calling this method more
+ * than once has no affect on this executor.
+ */
+ virtual void shutdown() = 0;
+
+ /**
+ * Attempts to stop all currently executing tasks and returns an ArrayList containing the
+ * Runnables that did not get executed, these object become the property of the caller and
+ * are not deleted by this class, they are removed from the work queue and forgotten about.
+ *
+ * There is no guarantee that this method will halt execution of currently executing tasks.
+ *
+ * @return an ArrayList containing all Runnable instance that were still waiting to be
+ * executed by this class, call now owns those pointers.
+ */
+ virtual ArrayList<decaf::lang::Runnable*> shutdownNow() = 0;
+
+ /**
+ * Returns whether this executor has been shutdown or not.
+ *
+ * @return true if this executor has been shutdown.
+ */
+ virtual bool isShutdown() const = 0;
+
+ /**
+ * Returns whether all tasks have completed after this executor was shut down.
+ *
+ * @return true if all tasks have completed after a request to shut down was made.
+ */
+ virtual bool isTerminated() const = 0;
+
};
}}}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Executors.h"
+
+#include <decaf/lang/Exception.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/ThreadFactory.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+
+using namespace decaf;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class DefaultThreadFactory : public ThreadFactory {
+ public:
+
+ static AtomicInteger* poolNumber;
+
+ private:
+
+ //ThreadGroup group;
+ AtomicInteger threadNumber;
+ std::string namePrefix;
+
+ public:
+
+ DefaultThreadFactory() : ThreadFactory(), threadNumber(1), namePrefix() {
+
+ if(DefaultThreadFactory::poolNumber == NULL) {
+ throw NullPointerException();
+ }
+
+ namePrefix = std::string("pool-") +
+ Integer::toString(poolNumber->getAndIncrement()) +
+ "-thread-";
+ }
+
+ Thread* newThread(Runnable* task) {
+ Thread* thread = new Thread(task, namePrefix + Integer::toString(threadNumber.getAndIncrement()));
+
+ if (thread->isDaemon()) {
+ thread->setDaemon(false);
+ }
+
+ if (thread->getPriority() != Thread::NORM_PRIORITY) {
+ thread->setPriority(Thread::NORM_PRIORITY);
+ }
+
+ return thread;
+ }
+ };
+
+ AtomicInteger* DefaultThreadFactory::poolNumber = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Executors::Executors() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Executors::~Executors() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Executors::initialize() {
+ DefaultThreadFactory::poolNumber = new AtomicInteger(1);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Executors::shutdown() {
+ delete DefaultThreadFactory::poolNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadFactory* Executors::getDefaultThreadFactory() {
+ return new DefaultThreadFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newFixedThreadPool(int nThreads) {
+
+ Pointer< BlockingQueue<Runnable*> > backingQ;
+
+ try{
+
+ backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+ ExecutorService* service = new ThreadPoolExecutor(
+ nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get());
+
+ backingQ.release();
+
+ return service;
+
+ } catch(NullPointerException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(IllegalArgumentException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(...) {
+ throw Exception();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* Executors::newFixedThreadPool(int nThreads, ThreadFactory* threadFactory) {
+
+ Pointer< BlockingQueue<Runnable*> > backingQ;
+
+ try{
+
+ backingQ.reset(new LinkedBlockingQueue<Runnable*>());
+ ExecutorService* service = new ThreadPoolExecutor(
+ nThreads, nThreads, 0, TimeUnit::MILLISECONDS, backingQ.get(), threadFactory);
+
+ backingQ.release();
+
+ return service;
+
+ } catch(NullPointerException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(IllegalArgumentException& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ throw ex;
+ } catch(...) {
+ throw Exception();
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h Mon Apr 11 20:22:39 2011
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_EXECUTORS_H_
+#define _DECAF_UTIL_CONCURRENT_EXECUTORS_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+#include <decaf/util/concurrent/ThreadFactory.h>
+
+namespace decaf {
+namespace util {
+namespace concurrent {
+
+ /**
+ * Implements a set of utilities for use with Executors, ExecutorService, ThreadFactory,
+ * and Callable types, as well as providing factory methods for instance of these
+ * types configured for the most common use cases.
+ *
+ * @since 1.0
+ */
+ class DECAF_API Executors {
+ private:
+
+ Executors();
+ Executors(const Executors&);
+ Executors& operator= (const Executors&);
+
+ public:
+
+ virtual ~Executors();
+
+ /**
+ * Creates and returns a new ThreadFactory that expresses the default behavior for
+ * ThreadFactories used in Executor classes. The default factory create a new
+ * non-daemon thread with normal priority and a name whose value is equal to
+ * pool-N-thread-M, where N is the sequence number of this factory, and M is the
+ * sequence number of the thread created by this factory.
+ *
+ * @returns a new instance of the default thread factory used in Executors, the
+ * caller takes ownership of the returned pointer.
+ */
+ static ThreadFactory* getDefaultThreadFactory();
+
+ /**
+ * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming
+ * tasks. The thread pool will use an unbounded queue to store pending tasks. At any
+ * given time the maximum threads in the pool will be equal to the number given to this
+ * factory method. If a thread in the pool dies a new one will be spawned to take its
+ * place in the pool. Tasks that are submitted when all pooled threads are busy will
+ * be held until a thread is freed if the pool has allocated its assigned number of
+ * threads already.
+ *
+ * @param nThreads
+ * The number of threads to assign as the max for the new ExecutorService.
+ *
+ * @returns pointer to a new ExecutorService that is owned by the caller.
+ *
+ * @throws IllegalArgumentException if nThreads is less than or equal to zero.
+ */
+ static ExecutorService* newFixedThreadPool(int nThreads);
+
+ /**
+ * Creates a new ThreadPoolExecutor with a fixed number of threads to process incoming
+ * tasks. The thread pool will use an unbounded queue to store pending tasks. At any
+ * given time the maximum threads in the pool will be equal to the number given to this
+ * factory method. If a thread in the pool dies a new one will be spawned to take its
+ * place in the pool. Tasks that are submitted when all pooled threads are busy will
+ * be held until a thread is freed if the pool has allocated its assigned number of
+ * threads already.
+ *
+ * @param nThreads
+ * The number of threads to assign as the max for the new ExecutorService.
+ * @param threadFactory
+ * Instance of a ThreadFactory that will be used by the Executor to spawn new
+ * worker threads. This parameter cannot be NULL.
+ *
+ * @returns pointer to a new ExecutorService that is owned by the caller.
+ *
+ * @throws NullPointerException if threadFactory is NULL.
+ * @throws IllegalArgumentException if nThreads is less than or equal to zero.
+ */
+ static ExecutorService* newFixedThreadPool(int nThreads, ThreadFactory* threadFactory);
+
+ private:
+
+ static void initialize();
+ static void shutdown();
+
+ friend class decaf::lang::Thread;
+
+ };
+
+}}}
+
+#endif /* _DECAF_UTIL_CONCURRENT_EXECUTORS_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executors.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp?rev=1091195&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp Mon Apr 11 20:22:39 2011
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RejectedExecutionHandler.h"
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler::RejectedExecutionHandler() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler::~RejectedExecutionHandler() {
+
+}
+
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/RejectedExecutionHandler.h Mon Apr 11 20:22:39 2011
@@ -36,7 +36,8 @@ namespace concurrent {
class DECAF_API RejectedExecutionHandler {
public:
- virtual ~RejectedExecutionHandler() {}
+ RejectedExecutionHandler();
+ virtual ~RejectedExecutionHandler();
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
@@ -56,8 +57,7 @@ namespace concurrent {
*
* @throws RejectedExecutionException if there is no remedy.
*/
- virtual void rejectedExecution( Runnable* r, ThreadPoolExecutor* executer )
- throw( RejectedExecutionException ) = 0;
+ virtual void rejectedExecution( decaf::lang::Runnable* r, ThreadPoolExecutor* executer ) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h Mon Apr 11 20:22:39 2011
@@ -20,11 +20,10 @@
#include <decaf/util/Config.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+
namespace decaf {
-namespace lang {
- class Thread;
- class Runnable;
-}
namespace util {
namespace concurrent {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1091195&r1=1091194&r2=1091195&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Mon Apr 11 20:22:39 2011
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <decaf/util/concurrent/ThreadPoolExecutor.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/CountDownLatch.h>
+
+#include "ThreadPoolExecutor.h"
+
+#include <decaf/util/Config.h>
+#include <decaf/util/LinkedList.h>
#include <decaf/util/Timer.h>
#include <decaf/util/TimerTask.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/util/concurrent/RejectedExecutionException.h>
-#include <decaf/util/Config.h>
-#include <decaf/util/LinkedList.h>
+#include <decaf/util/concurrent/RejectedExecutionHandler.h>
+#include <decaf/util/concurrent/Executors.h>
#include <decaf/lang/Pointer.h>
+#include <decaf/lang/Math.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
#include <algorithm>
#include <iostream>
@@ -63,6 +68,7 @@ namespace concurrent{
int maxPoolSize;
int corePoolSize;
long long keepAliveTime;
+ bool coreThreadsCanTimeout;
Pointer< BlockingQueue<decaf::lang::Runnable*> > workQueue;
Mutex mainLock;
CountDownLatch termination;
@@ -70,11 +76,15 @@ namespace concurrent{
long long completedTasks;
int largestPoolSize;
+ Pointer<ThreadFactory> factory;
+ Pointer<RejectedExecutionHandler> rejectionHandler;
+
public:
ExecutorKernel(ThreadPoolExecutor* parent,
int corePoolSize, int maxPoolSize, long long keepAliveTime,
- BlockingQueue<decaf::lang::Runnable*>* workQueue);
+ BlockingQueue<decaf::lang::Runnable*>* workQueue,
+ ThreadFactory* threadFactory, RejectedExecutionHandler* handler);
~ExecutorKernel();
@@ -88,18 +98,24 @@ namespace concurrent{
Runnable* deQueueTask();
- void AllocateThread();
+ bool addWorker();
+
+ int addAllWorkers();
bool isStoppedOrStopping();
void shutdown();
+ void shutdownNow(ArrayList<Runnable*>& unexecutedTasks);
+
bool awaitTermination(long long timeout, const TimeUnit& unit);
void handleWorkerExit(Worker* worker);
void tryTerminate();
+ void drainQueue(ArrayList<Runnable*>& unexecutedTasks);
+
};
class Worker : public lang::Thread {
@@ -137,6 +153,11 @@ namespace concurrent{
Runnable* task = this->kernel->deQueueTask();
if(this->done) {
+
+ if(task != NULL) {
+ delete task;
+ }
+
break;
}
@@ -216,7 +237,122 @@ namespace concurrent{
ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
long long keepAliveTime, const TimeUnit& unit,
BlockingQueue<decaf::lang::Runnable*>* workQueue) :
- kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
+ AbstractExecutorService(),
+ kernel(NULL) {
+
+ try{
+
+ if(workQueue == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+ }
+
+ Pointer<RejectedExecutionHandler> handler(new ThreadPoolExecutor::AbortPolicy());
+ Pointer<ThreadFactory> threadFactory(Executors::getDefaultThreadFactory());
+
+ this->kernel = new ExecutorKernel(
+ this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+ threadFactory.get(), handler.get());
+
+ handler.release();
+ threadFactory.release();
+ }
+ DECAF_CATCH_RETHROW(NullPointerException)
+ DECAF_CATCH_RETHROW(IllegalArgumentException)
+ DECAF_CATCH_RETHROW(Exception)
+ DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+ long long keepAliveTime, const TimeUnit& unit,
+ BlockingQueue<decaf::lang::Runnable*>* workQueue,
+ RejectedExecutionHandler* handler) :
+ AbstractExecutorService(),
+ kernel(NULL) {
+
+ try{
+
+ if(workQueue == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+ }
+
+ if(handler == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL.");
+ }
+
+ Pointer<ThreadFactory> threadFactory(Executors::getDefaultThreadFactory());
+
+ this->kernel = new ExecutorKernel(
+ this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+ threadFactory.get(), handler);
+
+ threadFactory.release();
+ }
+ DECAF_CATCH_RETHROW(NullPointerException)
+ DECAF_CATCH_RETHROW(Exception)
+ DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+ long long keepAliveTime, const TimeUnit& unit,
+ BlockingQueue<decaf::lang::Runnable*>* workQueue,
+ ThreadFactory* threadFactory) :
+ AbstractExecutorService(),
+ kernel(NULL) {
+
+ try{
+
+ if(workQueue == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+ }
+
+ if(threadFactory == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL.");
+ }
+
+ Pointer<RejectedExecutionHandler> handler(new ThreadPoolExecutor::AbortPolicy());
+
+ this->kernel = new ExecutorKernel(
+ this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+ threadFactory, handler.get());
+
+ handler.release();
+ }
+ DECAF_CATCH_RETHROW(NullPointerException)
+ DECAF_CATCH_RETHROW(Exception)
+ DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
+ long long keepAliveTime, const TimeUnit& unit,
+ BlockingQueue<decaf::lang::Runnable*>* workQueue,
+ ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
+ AbstractExecutorService(),
+ kernel(NULL) {
+
+ try{
+
+ if(workQueue == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The BlockingQueue pointer cannot be NULL.");
+ }
+
+ if(handler == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The RejectedExecutionHandler pointer cannot be NULL.");
+ }
+
+ if(threadFactory == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "The ThreadFactory pointer cannot be NULL.");
+ }
+
+ this->kernel = new ExecutorKernel(
+ this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue,
+ threadFactory, handler);
+ }
+ DECAF_CATCH_RETHROW(NullPointerException)
+ DECAF_CATCH_RETHROW(Exception)
+ DECAF_CATCHALL_THROW(Exception)
}
////////////////////////////////////////////////////////////////////////////////
@@ -225,7 +361,7 @@ ThreadPoolExecutor::~ThreadPoolExecutor(
try{
delete kernel;
}
- DECAF_CATCH_NOTHROW( lang::Exception )
+ DECAF_CATCH_NOTHROW(Exception)
DECAF_CATCHALL_NOTHROW()
}
@@ -257,6 +393,19 @@ void ThreadPoolExecutor::shutdown() {
}
////////////////////////////////////////////////////////////////////////////////
+ArrayList<Runnable*> ThreadPoolExecutor::shutdownNow() {
+
+ ArrayList<Runnable*> result;
+
+ try{
+ this->kernel->shutdownNow(result);
+ return result;
+ }
+ DECAF_CATCH_RETHROW( lang::Exception )
+ DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::awaitTermination(long long timeout, const TimeUnit& unit) {
try{
@@ -282,11 +431,53 @@ int ThreadPoolExecutor::getCorePoolSize(
}
////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setCorePoolSize(int poolSize) {
+
+ if (poolSize < 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Pool size given was negative.");
+ }
+
+ synchronized(&this->kernel->mainLock) {
+
+ //int delta = poolSize - this->kernel->corePoolSize;
+ this->kernel->corePoolSize = poolSize;
+
+ if (this->kernel->workers.size() > poolSize) {
+ // TODO - Once Threads are interruptible wake them up so some can terminate.
+ } else {
+
+ // TODO - Create new threads up to the new pool size, unless we are out
+ // of work or run out while creating.
+// int target = Math::min(delta, this->kernel->workQueue->size());
+// while (target-- > 0 && addWorker(NULL, true)) {
+// if (this->kernel->workQueue->isEmpty()) {
+// break;
+// }
+// }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
int ThreadPoolExecutor::getMaximumPoolSize() const {
return this->kernel->maxPoolSize;
}
////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setMaximumPoolSize(int maxSize) {
+
+ if (maxSize < 0 || maxSize < this->kernel->corePoolSize) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Size given was invalid.");
+ }
+
+ this->kernel->maxPoolSize = maxSize;
+
+ if (this->kernel->workers.size() > maxSize) {
+ // TODO - Wake idle worker threads when able to.
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
long long ThreadPoolExecutor::getTaskCount() const {
return this->kernel->workQueue->size();
}
@@ -325,6 +516,47 @@ int ThreadPoolExecutor::getLargestPoolSi
}
////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setThreadFactory(ThreadFactory* factory) {
+
+ if (factory == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory.");
+ }
+
+ if (factory != this->kernel->factory) {
+ Pointer<ThreadFactory> temp(factory);
+ this->kernel->factory.swap(temp);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadFactory* ThreadPoolExecutor::getThreadFactory() const {
+ return this->kernel->factory.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RejectedExecutionHandler* ThreadPoolExecutor::getRejectedExecutionHandler() const {
+ return this->kernel->rejectionHandler.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setRejectedExecutionHandler(RejectedExecutionHandler* handler) {
+
+ if (handler == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Cannot assign a NULL ThreadFactory.");
+ }
+
+ if (handler != this->kernel->rejectionHandler) {
+ Pointer<RejectedExecutionHandler> temp(handler);
+ this->kernel->rejectionHandler.swap(temp);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BlockingQueue<Runnable*>* ThreadPoolExecutor::getQueue() {
+ return this->kernel->workQueue.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ThreadPoolExecutor::isShutdown() const {
return this->kernel->stopped.get();
}
@@ -335,13 +567,94 @@ bool ThreadPoolExecutor::isTerminated()
}
////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::isTerminating() const {
+ return this->kernel->isStoppedOrStopping() && !this->kernel->terminated.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::allowCoreThreadTimeout(bool value) {
+
+ if (value == true && this->kernel->keepAliveTime == 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__,
+ "Keep Alive Time must be set to a non-zero value to enable this option.");
+ }
+
+ if (value != this->kernel->coreThreadsCanTimeout) {
+ this->kernel->coreThreadsCanTimeout = value;
+ if (value == true) {
+ // TODO - When Threads are interruptible wake works so they can check timeout.
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ThreadPoolExecutor::getKeepAliveTime(const TimeUnit& unit) const {
+ return unit.convert(this->kernel->keepAliveTime, TimeUnit::MILLISECONDS);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::setKeepAliveTime(long long timeout, const TimeUnit& unit) {
+
+ if (timeout < 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__, "Timeout value cannot be negative.");
+ }
+
+ if (this->kernel->coreThreadsCanTimeout == true && unit.toMillis(timeout) == 0) {
+ throw IllegalArgumentException(__FILE__, __LINE__,
+ "Keep Alive Time must be set to a non-zero value when allowCoreThreadsTimeout is enabled.");
+ }
+
+ long keepAliveTime = unit.toMillis(timeout);
+ long delta = keepAliveTime - this->kernel->keepAliveTime;
+ this->kernel->keepAliveTime = keepAliveTime;
+ if (delta < 0) {
+ // TODO - When Threads are interruptible wake works so they can check timeout.
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::allowsCoreThreadTimeout() const {
+ return this->kernel->coreThreadsCanTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::prestartCoreThread() {
+ return this->kernel->addWorker();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::prestartAllCoreThreads() {
+ return this->kernel->addAllWorkers();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ThreadPoolExecutor::remove(decaf::lang::Runnable* task) {
+ bool removed = this->kernel->workQueue->remove(task);
+ this->kernel->tryTerminate();
+ return removed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::purge() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::beforeExecute(Thread* thread DECAF_UNUSED, Runnable* task DECAF_UNUSED) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutor::afterExecute(Runnable* task DECAF_UNUSED, decaf::lang::Throwable* error DECAF_UNUSED) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ThreadPoolExecutor::terminated() {
}
////////////////////////////////////////////////////////////////////////////////
ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
int maxPoolSize, long long keepAliveTime,
- BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+ BlockingQueue<decaf::lang::Runnable*>* workQueue,
+ ThreadFactory* threadFactory, RejectedExecutionHandler* handler) :
parent(parent),
workers(),
deadWorkers(),
@@ -353,11 +666,14 @@ ExecutorKernel::ExecutorKernel(ThreadPoo
maxPoolSize(maxPoolSize),
corePoolSize(corePoolSize),
keepAliveTime(keepAliveTime),
- workQueue(workQueue),
+ coreThreadsCanTimeout(false),
+ workQueue(),
mainLock(),
termination(1),
completedTasks(0),
- largestPoolSize(0) {
+ largestPoolSize(0),
+ factory(),
+ rejectionHandler() {
if(corePoolSize < 0 || maxPoolSize <= 0 ||
maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -365,12 +681,16 @@ ExecutorKernel::ExecutorKernel(ThreadPoo
throw IllegalArgumentException(__FILE__, __LINE__, "Argument out of range.");
}
- if(workQueue == NULL) {
- throw NullPointerException(__FILE__, __LINE__, "BlockingQueue pointer was null");
+ if(workQueue == NULL || threadFactory == NULL || handler == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Required parameter was NULL");
}
this->cleanupTimer.scheduleAtFixedRate(
new WorkerKiller(this), TimeUnit::SECONDS.toMillis(10), TimeUnit::SECONDS.toMillis(10));
+
+ this->workQueue.reset(workQueue);
+ this->factory.reset(threadFactory);
+ this->rejectionHandler.reset(handler);
}
////////////////////////////////////////////////////////////////////////////////
@@ -414,7 +734,7 @@ void ExecutorKernel::onTaskStarted(Worke
// cause the number of Task to exceed the number of free threads
// once the Threads got a chance to wake up and service the queue
if( freeThreads.get() == 0 && !workQueue->isEmpty() ) {
- AllocateThread();
+ addWorker();
}
}
}
@@ -470,13 +790,13 @@ void ExecutorKernel::enQueueTask(Runnabl
// If there's nobody open to do work, then create some more
// threads to handle the work.
if( this->freeThreads.get() == 0 ) {
- AllocateThread();
+ addWorker();
}
- }
- // queue the new work.
- if(!this->workQueue->offer(task)) {
- throw RejectedExecutionException(__FILE__, __LINE__, "Task Rejected by work Q");
+ // queue the new work.
+ if(isStoppedOrStopping() || !this->workQueue->offer(task)) {
+ this->rejectionHandler->rejectedExecution(task, this->parent);
+ }
}
}
DECAF_CATCH_RETHROW( Exception )
@@ -490,24 +810,19 @@ Runnable* ExecutorKernel::deQueueTask()
Runnable* task = NULL;
- // Wait for work, wait in a while loop since another thread could
- // be waiting for a lock and get the work before we get woken up
- // from our wait.
- while( !isStoppedOrStopping() ) {
+ while(true) {
- // TODO - Threads aren't interruptible yet.
+ // TODO - Threads aren't interruptible yet, so spin wait.
if(workQueue->poll(task, 10, TimeUnit::MILLISECONDS)) {
break;
}
- }
-
- // Don't give more work if we are closing down
- if(isStoppedOrStopping()) {
- if(task != NULL) {
- delete task;
+ if(isStoppedOrStopping() && workQueue->isEmpty()) {
+ break;
}
+ }
+ if(isStoppedOrStopping() && task == NULL) {
return NULL;
}
@@ -523,12 +838,12 @@ Runnable* ExecutorKernel::deQueueTask()
}
////////////////////////////////////////////////////////////////////////////////
-void ExecutorKernel::AllocateThread() {
+bool ExecutorKernel::addWorker() {
try{
if( this->workers.size() >= this->maxPoolSize ) {
- return;
+ return false;
}
synchronized( &mainLock ) {
@@ -538,6 +853,38 @@ void ExecutorKernel::AllocateThread() {
newWorker->start();
this->largestPoolSize++;
}
+
+ return true;
+ }
+ DECAF_CATCH_RETHROW( lang::Exception )
+ DECAF_CATCHALL_THROW( lang::Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ExecutorKernel::addAllWorkers() {
+
+ try{
+
+ if( this->workers.size() >= this->maxPoolSize ) {
+ return 0;
+ }
+
+ int delta = 0;
+
+ synchronized( &mainLock ) {
+
+ delta = this->maxPoolSize - this->workers.size();
+
+ for(int i = 0; i < delta; ++i) {
+ Worker* newWorker = new Worker(this);
+ this->workers.add(newWorker);
+ freeThreads.incrementAndGet();
+ newWorker->start();
+ this->largestPoolSize++;
+ }
+ }
+
+ return delta;
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
@@ -563,6 +910,29 @@ void ExecutorKernel::shutdown() {
synchronized(&mainLock) {
+ // TODO - When threads are interruptible, we need to interrupt the Queue.
+ //synchronized( workQueue.get() ) {
+ // // Signal the Queue so that all waiters are notified
+ // workQueue->notifyAll();
+ //}
+ }
+
+ this->tryTerminate();
+ this->stopped.set(true);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::shutdownNow(ArrayList<Runnable*>& unexecutedTasks) {
+
+ if(isStoppedOrStopping()) {
+ return;
+ }
+
+ if(this->stopping.compareAndSet(false, true)) {
+
+ synchronized(&mainLock) {
+
Pointer< Iterator<Worker*> > iter(this->workers.iterator());
while(iter->hasNext()) {
@@ -574,13 +944,38 @@ void ExecutorKernel::shutdown() {
// // Signal the Queue so that all waiters are notified
// workQueue->notifyAll();
//}
+
+ this->drainQueue(unexecutedTasks);
}
+ this->tryTerminate();
this->stopped.set(true);
}
}
////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::drainQueue(ArrayList<Runnable*>& unexecutedTasks) {
+
+ // Some Queue implementations can fail in poll and drainTo so we check
+ // after attempting to drain the Queue and if its not empty we remove
+ // the tasks one by one.
+
+ this->workQueue->drainTo(unexecutedTasks);
+ if (!this->workQueue->isEmpty()) {
+
+ std::vector<Runnable*> tasks = this->workQueue->toArray();
+ std::vector<Runnable*>::iterator iter = tasks.begin();
+
+ for (; iter != tasks.end(); ++iter) {
+
+ if (this->workQueue->remove(*iter)) {
+ unexecutedTasks.add(*iter);
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ExecutorKernel::awaitTermination(long long timeout, const TimeUnit& unit) {
if (this->terminated.get() == true) {