You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/03 20:31:00 UTC
svn commit: r543955 - in /activemq/activemq-cpp/trunk/src/decaf/src: main/
main/decaf/lang/ main/decaf/util/concurrent/ test/
test/decaf/util/concurrent/
Author: tabish
Date: Sun Jun 3 11:30:59 2007
New Revision: 543955
URL: http://svn.apache.org/viewvc?view=rev&rev=543955
Log:
https://issues.apache.org/activemq/browse/AMQCPP-103
Building up the Decaf Library
Added:
activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.cpp
activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.h
activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.cpp
activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.h
Modified:
activemq/activemq-cpp/trunk/src/decaf/src/main/Makefile.am
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/lang/Runnable.h
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp
activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h
activemq/activemq-cpp/trunk/src/decaf/src/test/Makefile.am
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/Makefile.am?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/Makefile.am Sun Jun 3 11:30:59 2007
@@ -33,6 +33,8 @@
decaf/io/BlockingByteArrayInputStream.cpp \
decaf/util/concurrent/Mutex.cpp \
decaf/util/concurrent/CountDownLatch.cpp \
+ decaf/util/concurrent/PooledThread.cpp \
+ decaf/util/concurrent/ThreadPool.cpp \
decaf/util/Date.cpp \
decaf/util/Guid.cpp \
decaf/util/StringTokenizer.cpp \
@@ -94,6 +96,10 @@
decaf/util/concurrent/CountDownLatch.h \
decaf/util/concurrent/Synchronizable.h \
decaf/util/concurrent/Mutex.h \
+ decaf/util/concurrent/PooledThread.h \
+ decaf/util/concurrent/PooledThreadListener.h \
+ decaf/util/concurrent/TaskListener.h \
+ decaf/util/concurrent/ThreadPool.h \
decaf/util/Date.h \
decaf/util/Guid.h \
decaf/util/Map.h \
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/lang/Runnable.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/lang/Runnable.h?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/lang/Runnable.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/lang/Runnable.h Sun Jun 3 11:30:59 2007
@@ -17,6 +17,8 @@
#ifndef _DECAF_LANG_RUNNABLE_H_
#define _DECAF_LANG_RUNNABLE_H_
+#include <decaf/util/Config.h>
+
namespace decaf{
namespace lang{
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.cpp Sun Jun 3 11:30:59 2007
@@ -28,7 +28,7 @@
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
-LOGCMS_INITIALIZE(logger, PooledThread, "com.activemq.concurrent.PooledThread")
+LOGDECAF_INITIALIZE(logger, PooledThread, "com.activemq.concurrent.PooledThread")
////////////////////////////////////////////////////////////////////////////////
PooledThread::PooledThread(ThreadPool* pool)
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThread.h Sun Jun 3 11:30:59 2007
@@ -20,6 +20,7 @@
#include <decaf/lang/Thread.h>
#include <decaf/util/concurrent/PooledThreadListener.h>
#include <decaf/util/logging/LoggerDefines.h>
+#include <decaf/util/Config.h>
#include <decaf/lang/Exception.h>
@@ -29,7 +30,7 @@
class ThreadPool;
- class PooledThread : public lang::Thread
+ class DECAF_API PooledThread : public lang::Thread
{
private:
@@ -46,7 +47,7 @@
ThreadPool* pool;
// Logger Init
- LOGCMS_DECLARE(logger)
+ LOGDECAF_DECLARE(logger)
public:
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/PooledThreadListener.h Sun Jun 3 11:30:59 2007
@@ -18,6 +18,7 @@
#define _DECAF_UTIL_CONCURRENT_POOLEDTHREADLISTENER_H_
#include <decaf/lang/Exception.h>
+#include <decaf/util/Config.h>
namespace decaf{
namespace util{
@@ -25,7 +26,7 @@
class PooledThread;
- class PooledThreadListener
+ class DECAF_API PooledThreadListener
{
public:
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/TaskListener.h Sun Jun 3 11:30:59 2007
@@ -19,12 +19,13 @@
#include <decaf/lang/Runnable.h>
#include <decaf/lang/Exception.h>
+#include <decaf/util/Config.h>
namespace decaf{
namespace util{
namespace concurrent{
- class TaskListener
+ class DECAF_API TaskListener
{
public:
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.cpp Sun Jun 3 11:30:59 2007
@@ -34,8 +34,8 @@
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
-LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
-LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")
+LOGDECAF_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
+LOGDECAF_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")
////////////////////////////////////////////////////////////////////////////////
ThreadPool ThreadPool::instance;
@@ -94,7 +94,7 @@
{
try
{
- if(!task.first || !task.second)
+ if( !task.first || !task.second )
{
throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
"ThreadPool::QueueTask - Invalid args for Task");
Modified: activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/main/decaf/util/concurrent/ThreadPool.h Sun Jun 3 11:30:59 2007
@@ -23,7 +23,8 @@
#include <decaf/util/concurrent/TaskListener.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/Queue.h>
-#include <deacf/util/logging/LoggerDefines.h>
+#include <decaf/util/logging/LoggerDefines.h>
+#include <decaf/util/Config.h>
#include <vector>
@@ -47,7 +48,7 @@
* object that implements the <code>Runnable</code> insterface and
* one of the worker threads will executing it in its thread context.
*/
- class ThreadPool : public PooledThreadListener
+ class DECAF_API ThreadPool : public PooledThreadListener
{
public:
@@ -56,7 +57,7 @@
static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
// Types
- typedef std::pair<Runnable*, TaskListener*> Task;
+ typedef std::pair<lang::Runnable*, TaskListener*> Task;
private:
@@ -82,8 +83,8 @@
Mutex poolLock;
// Logger Init
- LOGCMS_DECLARE(logger)
- LOGCMS_DECLARE(marker)
+ LOGDECAF_DECLARE(logger)
+ LOGDECAF_DECLARE(marker)
private: // Statics
Modified: activemq/activemq-cpp/trunk/src/decaf/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/test/Makefile.am?view=diff&rev=543955&r1=543954&r2=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/decaf/src/test/Makefile.am Sun Jun 3 11:30:59 2007
@@ -24,7 +24,21 @@
decaf/util/DateTest.cpp \
decaf/util/GuidTest.cpp \
decaf/util/concurrent/CountDownLatchTest.cpp \
+ decaf/util/concurrent/MutexTest.cpp \
+ decaf/util/concurrent/ThreadPoolTest.cpp \
main.cpp
+
+h_sources = \
+ decaf/lang/BooleanTest.h \
+ decaf/lang/IntegerTest.h \
+ decaf/lang/LongTest.h \
+ decaf/lang/ThreadTest.h \
+ decaf/util/StringTokenizerTest.h \
+ decaf/util/DateTest.h \
+ decaf/util/GuidTest.h \
+ decaf/util/concurrent/CountDownLatchTest.h \
+ decaf/util/concurrent/MutexTest.h \
+ decaf/util/concurrent/ThreadPoolTest.h
## Compile this as part of make check
check_PROGRAMS = decaf-test
Added: activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.cpp?view=auto&rev=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.cpp (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.cpp Sun Jun 3 11:30:59 2007
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MutexTest.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::MutexTest );
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+///////////////////////////////////////////////////////////////////////////////
+void MutexTest::testTimedWait(){
+
+ try
+ {
+ MyTimedWaitingThread test;
+ time_t startTime = time( NULL );
+ test.start();
+ test.join();
+ time_t endTime = time( NULL );
+
+ time_t delta = endTime - startTime;
+
+ CPPUNIT_ASSERT( delta >= 1 && delta <= 3 );
+ }
+ catch(lang::Exception& ex)
+ {
+ std::cout << ex.getMessage() << std::endl;
+ }
+}
+
+void MutexTest::testWait(){
+
+ try
+ {
+ MyWaitingThread test;
+ test.start();
+
+ Thread::sleep( 1000 );
+
+ synchronized( &test )
+ {
+ for( int ix=0; ix<100; ix++ ){
+ test.value += 1;
+ }
+
+ test.notify();
+ }
+
+ test.join();
+
+ CPPUNIT_ASSERT( test.value == 2500 );
+ } catch( lang::Exception& ex ) {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
+
+void MutexTest::test()
+{
+ MyThread test;
+
+ synchronized(&test){
+
+ test.start();
+
+ for( int ix=0; ix<100; ix++ ){
+ test.value += 1;
+ }
+ }
+
+ test.join();
+
+ CPPUNIT_ASSERT( test.value == 2500 );
+}
+
+void MutexTest::testNotify()
+{
+ try{
+ Mutex mutex;
+ Mutex started;
+ Mutex completed;
+
+ const int numThreads = 30;
+ MyNotifiedThread* threads[numThreads];
+
+ // Create and start all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ threads[ix] = new MyNotifiedThread( &mutex, &started, &completed );
+ threads[ix]->start();
+ }
+
+ synchronized( &started )
+ {
+ int count = 0;
+
+ while( count < ( numThreads ) )
+ {
+ started.wait( 30 );
+ count++;
+ }
+ }
+
+ synchronized(&mutex)
+ {
+ mutex.notify();
+ }
+
+ Thread::sleep( 1000 );
+
+ int counter = 0;
+ for( int ix=0; ix<numThreads; ++ix ){
+ if( threads[ix]->done ){
+ counter++;
+ }
+ }
+
+ // Make sure only 1 thread was notified.
+ CPPUNIT_ASSERT( counter == 1 );
+
+ synchronized(&mutex)
+ {
+ // Notify all threads.
+ for( int ix=0; ix<numThreads-1; ++ix ){
+ mutex.notify();
+ }
+ }
+
+ synchronized( &started )
+ {
+ int count = 0;
+
+ while( count < ( numThreads ) )
+ {
+ started.wait( 30 );
+ count++;
+ }
+ }
+
+ int numComplete = 0;
+ for( int ix=0; ix<numThreads; ++ix ){
+ if( threads[ix]->done ){
+ numComplete++;
+ }
+ }
+ CPPUNIT_ASSERT( numComplete == numThreads );
+
+ synchronized( &mutex )
+ {
+ mutex.wait( 5 );
+ }
+
+ synchronized( &mutex )
+ {
+ mutex.notifyAll();
+ }
+
+ // Delete all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ delete threads[ix];
+ }
+
+ }catch( lang::Exception& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
+
+void MutexTest::testNotifyAll()
+{
+ try{
+ Mutex mutex;
+ Mutex started;
+ Mutex completed;
+
+ const int numThreads = 100;
+ MyNotifiedThread* threads[numThreads];
+
+ // Create and start all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ threads[ix] = new MyNotifiedThread( &mutex, &started, &completed );
+ threads[ix]->start();
+ }
+
+ synchronized( &started )
+ {
+ int count = 0;
+
+ while( count < ( numThreads ) )
+ {
+ started.wait( 30 );
+ count++;
+ }
+ }
+
+ for( int ix=0; ix<numThreads; ++ix )
+ {
+ if( threads[ix]->done == true ){
+ printf("threads[%d] is done prematurely\n", ix );
+ }
+ CPPUNIT_ASSERT( threads[ix]->done == false );
+ }
+
+ // Notify all threads.
+ synchronized( &mutex ){
+ mutex.notifyAll();
+ }
+
+ synchronized( &completed )
+ {
+ int count = 0;
+
+ while( count < ( numThreads ) )
+ {
+ completed.wait( 30 );
+ count++;
+ }
+ }
+
+ int numComplete = 0;
+ for( int ix=0; ix<numThreads; ++ix ){
+ if( threads[ix]->done ){
+ numComplete++;
+ }
+ }
+ //printf("numComplete: %d, numThreads: %d\n", numComplete, numThreads );
+ CPPUNIT_ASSERT( numComplete == numThreads );
+
+ // Delete all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ threads[ix]->join();
+ delete threads[ix];
+ }
+
+ }catch( lang::Exception& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
+
+void MutexTest::testRecursiveLock()
+{
+ try{
+ Mutex mutex;
+
+ const int numThreads = 30;
+ MyRecursiveLockThread* threads[numThreads];
+
+ // Create and start all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ threads[ix] = new MyRecursiveLockThread( &mutex );
+ threads[ix]->start();
+ }
+
+ // Sleep so all the threads can get to the wait.
+ Thread::sleep( 1000 );
+
+ for( int ix=0; ix<numThreads; ++ix ){
+ if( threads[ix]->done == true ){
+ std::cout << "threads[" << ix
+ << "] is done prematurely\n";
+ }
+ CPPUNIT_ASSERT( threads[ix]->done == false );
+ }
+
+ // Notify all threads.
+ synchronized( &mutex )
+ {
+ synchronized( &mutex )
+ {
+ mutex.notifyAll();
+ }
+ }
+
+ // Sleep to give the threads time to wake up.
+ Thread::sleep( 1000 );
+
+ for( int ix=0; ix<numThreads; ++ix ){
+ if( threads[ix]->done != true ){
+ std::cout<< "threads[" << ix << "] is not done\n";
+ }
+ CPPUNIT_ASSERT( threads[ix]->done == true );
+ }
+
+ // Delete all the threads.
+ for( int ix=0; ix<numThreads; ++ix ){
+ delete threads[ix];
+ }
+
+ }catch( lang::Exception& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
+
+void MutexTest::testDoubleLock()
+{
+ try{
+ Mutex mutex1;
+ Mutex mutex2;
+
+ MyDoubleLockThread thread(&mutex1, &mutex2);
+
+ thread.start();
+
+ // Let the thread get both locks
+ Thread::sleep( 200 );
+
+ // Lock mutex 2, thread is waiting on it
+ synchronized(&mutex2)
+ {
+ mutex2.notify();
+ }
+
+ // Let the thread die
+ thread.join();
+
+ CPPUNIT_ASSERT( thread.done );
+ }catch( lang::Exception& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
Added: activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.h?view=auto&rev=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.h (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/MutexTest.h Sun Jun 3 11:30:59 2007
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_MUTEXTEST_H_
+#define _DECAF_UTIL_CONCURRENT_MUTEXTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <time.h>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+ class MutexTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( MutexTest );
+ CPPUNIT_TEST( test );
+ CPPUNIT_TEST( testWait );
+ CPPUNIT_TEST( testTimedWait );
+ CPPUNIT_TEST( testNotify );
+ CPPUNIT_TEST( testNotifyAll );
+ CPPUNIT_TEST( testRecursiveLock );
+ CPPUNIT_TEST( testDoubleLock );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ class MyThread
+ :
+ public lang::Thread,
+ public Synchronizable{
+
+ private:
+
+ Mutex mutex;
+
+ public:
+
+ int value;
+ MyThread(){ value = 0;}
+ virtual ~MyThread(){}
+
+ virtual void lock() throw(lang::Exception){
+ mutex.lock();
+ }
+ virtual void unlock() throw(lang::Exception){
+ mutex.unlock();
+ }
+ virtual void wait() throw(lang::Exception){
+ mutex.wait();
+ }
+ virtual void wait(unsigned long millisecs) throw(lang::Exception){
+ mutex.wait( millisecs );
+ }
+ virtual void notify() throw(lang::Exception){
+ mutex.notify();
+ }
+ virtual void notifyAll() throw(lang::Exception){
+ mutex.notifyAll();
+ }
+
+ virtual void run(){
+
+ {
+ Lock lock (this);
+
+ value = value * 25;
+ }
+ }
+
+ };
+
+ class MyWaitingThread
+ :
+ public lang::Thread,
+ public Synchronizable{
+
+ private:
+
+ Mutex mutex;
+
+ public:
+
+ int value;
+ MyWaitingThread(){ value = 0;}
+ virtual ~MyWaitingThread(){}
+ virtual void lock() throw(lang::Exception){
+ mutex.lock();
+ }
+ virtual void unlock() throw(lang::Exception){
+ mutex.unlock();
+ }
+ virtual void wait() throw(lang::Exception){
+ mutex.wait();
+ }
+ virtual void wait(unsigned long millisecs) throw(lang::Exception){
+ mutex.wait( millisecs );
+ }
+ virtual void notify() throw(lang::Exception){
+ mutex.notify();
+ }
+ virtual void notifyAll() throw(lang::Exception){
+ mutex.notifyAll();
+ }
+
+ virtual void run(){
+
+ try
+ {
+ synchronized(this)
+ {
+ this->wait();
+
+ std::cout.flush();
+
+ value = value * 25;
+ }
+ }
+ catch(lang::Exception& ex)
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ class MyTimedWaitingThread
+ :
+ public lang::Thread,
+ public Synchronizable{
+
+ private:
+
+ Mutex mutex;
+
+ public:
+
+ int value;
+ MyTimedWaitingThread(){ value = 0;}
+ virtual ~MyTimedWaitingThread(){}
+ virtual void lock() throw(lang::Exception){
+ mutex.lock();
+ }
+ virtual void unlock() throw(lang::Exception){
+ mutex.unlock();
+ }
+ virtual void wait() throw(lang::Exception){
+ mutex.wait();
+ }
+ virtual void wait(unsigned long millisecs) throw(lang::Exception){
+ mutex.wait( millisecs );
+ }
+ virtual void notify() throw(lang::Exception){
+ mutex.notify();
+ }
+ virtual void notifyAll() throw(lang::Exception){
+ mutex.notifyAll();
+ }
+
+ virtual void run(){
+
+ try
+ {
+ synchronized(this)
+ {
+ this->wait(2000);
+
+ value = 666;
+ }
+ }
+ catch(lang::Exception& ex)
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ class MyNotifiedThread
+ :
+ public lang::Thread,
+ public Synchronizable{
+
+ public:
+
+ bool done;
+ Mutex* mutex;
+ Mutex* started;
+ Mutex* completed;
+
+ public:
+
+ int value;
+ MyNotifiedThread(Mutex* mutex, Mutex* started, Mutex* completed ){
+ this->mutex = mutex;
+ this->started = started;
+ this->completed = completed;
+ this->done = false;
+ }
+ virtual ~MyNotifiedThread(){}
+ virtual void lock() throw(lang::Exception){
+ mutex->lock();
+ }
+ virtual void unlock() throw(lang::Exception){
+ mutex->unlock();
+ }
+ virtual void wait() throw(lang::Exception){
+ mutex->wait();
+ }
+ virtual void wait(unsigned long millisecs) throw(lang::Exception){
+ mutex->wait( millisecs );
+ }
+ virtual void notify() throw(lang::Exception){
+ mutex->notify();
+ }
+ virtual void notifyAll() throw(lang::Exception){
+ mutex->notifyAll();
+ }
+
+ virtual void run(){
+
+ try
+ {
+ done = false;
+ synchronized(this)
+ {
+ synchronized( started )
+ {
+ started->notify();
+ }
+
+ this->wait();
+ done = true;
+
+ synchronized( completed )
+ {
+ completed->notify();
+ }
+ }
+ }
+ catch(lang::Exception& ex)
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ class MyRecursiveLockThread
+ :
+ public lang::Thread,
+ public Synchronizable{
+
+ public:
+
+ bool done;
+ Mutex* mutex;
+
+ public:
+
+ int value;
+ MyRecursiveLockThread(Mutex* mutex){ this->mutex = mutex; done = false; }
+ virtual ~MyRecursiveLockThread(){}
+ virtual void lock() throw(lang::Exception){
+ mutex->lock();
+ }
+ virtual void unlock() throw(lang::Exception){
+ mutex->unlock();
+ }
+ virtual void wait() throw(lang::Exception){
+ mutex->wait();
+ }
+ virtual void wait(unsigned long millisecs) throw(lang::Exception){
+ mutex->wait( millisecs );
+ }
+ virtual void notify() throw(lang::Exception){
+ mutex->notify();
+ }
+ virtual void notifyAll() throw(lang::Exception){
+ mutex->notifyAll();
+ }
+
+ virtual void run(){
+
+ try
+ {
+ done = false;
+ synchronized(this)
+ {
+ synchronized(this)
+ {
+ this->wait();
+ done = true;
+ }
+ }
+ }
+ catch(lang::Exception& ex)
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ class MyDoubleLockThread
+ :
+ public lang::Thread
+ {
+
+ public:
+
+ bool done;
+ Mutex* mutex1;
+ Mutex* mutex2;
+
+ public:
+
+ int value;
+ MyDoubleLockThread(Mutex* mutex1, Mutex* mutex2)
+ {
+ this->mutex1 = mutex1;
+ this->mutex2 = mutex2;
+ done = false;
+ }
+
+ virtual ~MyDoubleLockThread(){}
+
+ virtual void run(){
+
+ try
+ {
+ done = false;
+ synchronized(mutex1)
+ {
+ synchronized(mutex2)
+ {
+ mutex2->wait();
+ done = true;
+ }
+ }
+ }
+ catch(lang::Exception& ex)
+ {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ public:
+
+ virtual ~MutexTest(){}
+ virtual void setUp(){}
+ virtual void tearDown(){}
+
+ void testTimedWait();
+ void testWait();
+ void test();
+ void testNotify();
+ void testNotifyAll();
+ void testRecursiveLock();
+ void testDoubleLock();
+
+ };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_MUTEXTEST_H_*/
Added: activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.cpp?view=auto&rev=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.cpp (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.cpp Sun Jun 3 11:30:59 2007
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ThreadPoolTest.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( decaf::util::concurrent::ThreadPoolTest );
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test1()
+{
+ CountDownLatch myLatch( 3 );
+ this->latch = &myLatch;
+
+ MyTask task1( 1 );
+ MyTask task2( 2 );
+ MyTask task3( 3 );
+
+ this->complete = 0;
+ this->tasksToComplete = 3;
+
+ ThreadPool* pool = ThreadPool::getInstance();
+
+ pool->queueTask( ThreadPool::Task( &task1, this ) );
+ pool->queueTask( ThreadPool::Task( &task2, this ) );
+ pool->queueTask( ThreadPool::Task( &task3, this ) );
+
+ // Wait for them to finish, if we can't do this in 30 seconds then
+ // there's probably something really wrong.
+ myLatch.await( 30000 );
+
+ CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+
+ CPPUNIT_ASSERT( task1.value == 101 );
+ CPPUNIT_ASSERT( task2.value == 102 );
+ CPPUNIT_ASSERT( task3.value == 103 );
+
+ CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
+ CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+
+ CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+ CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+
+ pool->setMaxThreads(50);
+ pool->setBlockSize(50);
+
+ CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
+ CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+
+ // Give it a little time to create all those threads.
+ for( int i = 0; i < 1000; ++i ) {
+ if( pool->getFreeThreadCount() == pool->getPoolSize() ) {
+ break;
+ }
+
+ Thread::sleep( 100 );
+ }
+
+ CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
+ CPPUNIT_ASSERT( this->caughtEx == false );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test2() {
+
+ try
+ {
+ ThreadPool pool;
+ Mutex myMutex;
+
+ CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+ CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+ pool.setMaxThreads(3);
+ pool.setBlockSize(1);
+ CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
+ CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
+ CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
+ pool.reserve( 4 );
+ CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
+ CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
+
+ CountDownLatch startedLatch1(3); // First three should go right away
+ CountDownLatch startedLatch2(1); // The fourth one goes after others finish
+ CountDownLatch doneLatch(4); // All should be done when we are at the end.
+
+ this->latch = &doneLatch;
+
+ MyWaitingTask task1( &myMutex, &startedLatch1 );
+ MyWaitingTask task2( &myMutex, &startedLatch1 );
+ MyWaitingTask task3( &myMutex, &startedLatch1 );
+ MyWaitingTask task4( &myMutex, &startedLatch2 );
+
+ this->complete = 0;
+ this->tasksToComplete = 4;
+
+ pool.queueTask( ThreadPool::Task( &task1, this ) );
+ pool.queueTask( ThreadPool::Task( &task2, this ) );
+ pool.queueTask( ThreadPool::Task( &task3, this ) );
+ pool.queueTask( ThreadPool::Task( &task4, this ) );
+
+ // Wait 30 seconds, then we let it fail because something is
+ // probably very wrong.
+ startedLatch1.await( 30000 );
+
+ CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
+ CPPUNIT_ASSERT( pool.getBacklog() == 1 );
+
+ // Wake up the tasks.
+ synchronized(&myMutex) {
+ myMutex.notifyAll();
+ }
+
+ // Wait 30 seconds, then we let it fail because something is
+ // probably very wrong.
+ startedLatch2.await( 30000 );
+
+ // Wake up the last task.
+ synchronized(&myMutex) {
+ myMutex.notifyAll();
+ }
+
+ // Wait for them to finish, if it takes longer than 30 seconds
+ // something is not right.
+ doneLatch.await( 30000 );
+
+ CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+ CPPUNIT_ASSERT( this->caughtEx == false );
+ }
+ catch( lang::Exception& ex ) {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
Added: activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.h?view=auto&rev=543955
==============================================================================
--- activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.h (added)
+++ activemq/activemq-cpp/trunk/src/decaf/src/test/decaf/util/concurrent/ThreadPoolTest.h Sun Jun 3 11:30:59 2007
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_
+#define _DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/concurrent/ThreadPool.h>
+#include <decaf/util/concurrent/TaskListener.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/Config.h>
+
+namespace decaf{
+namespace util{
+namespace concurrent{
+
+ class ThreadPoolTest :
+ public CppUnit::TestFixture,
+ public TaskListener
+ {
+ CPPUNIT_TEST_SUITE( ThreadPoolTest );
+ CPPUNIT_TEST( test1 );
+ CPPUNIT_TEST( test2 );
+ CPPUNIT_TEST_SUITE_END();
+
+ int tasksToComplete;
+ int complete;
+ Mutex mutex;
+ bool caughtEx;
+ CountDownLatch* latch;
+
+ public:
+
+ ThreadPoolTest() {
+ complete = 0;
+ tasksToComplete = 0;
+ caughtEx = false;
+ latch = NULL;
+ }
+
+ virtual ~ThreadPoolTest() {}
+
+ virtual void onTaskComplete( lang::Runnable* task DECAF_UNUSED)
+ {
+ try{
+
+ complete++;
+
+ if( latch != NULL ) {
+ latch->countDown();
+ }
+ }catch( lang::Exception& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+
+ virtual void onTaskException(
+ lang::Runnable* task DECAF_UNUSED,
+ lang::Exception& ex DECAF_UNUSED) {
+ caughtEx = true;
+ }
+
+ public:
+
+ class MyTask : public lang::Runnable
+ {
+ public:
+
+ int value;
+
+ MyTask( int x ) {
+ value = x;
+ }
+
+ virtual ~MyTask() {};
+
+ virtual void run(void) {
+ value += 100;
+ }
+ };
+
+ class MyWaitingTask : public lang::Runnable
+ {
+ public:
+
+ Mutex* mutex;
+ CountDownLatch* startedLatch;
+
+ MyWaitingTask( Mutex* mutex, CountDownLatch* startedLatch ) {
+ this->mutex = mutex;
+ this->startedLatch = startedLatch;
+ }
+
+ virtual ~MyWaitingTask() {};
+
+ virtual void run(void) {
+ try
+ {
+ synchronized(mutex) {
+ startedLatch->countDown();
+ mutex->wait();
+ }
+ }
+ catch( lang::Exception& ex ) {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
+
+ public:
+
+ virtual void test1();
+ virtual void test2();
+
+ };
+
+}}}
+
+#endif /*_DECAF_UTIL_CONCURRENT_THREADPOOLTEST_H_*/