You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/06 14:48:39 UTC
svn commit: r526142 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent:
ThreadPoolTest.cpp ThreadPoolTest.h
Author: tabish
Date: Fri Apr 6 05:48:38 2007
New Revision: 526142
URL: http://svn.apache.org/viewvc?view=rev&rev=526142
Log:
http://issues.apache.org/activemq/browse/AMQCPP-97
Used CountDownLatch instead of sleeps.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp?view=diff&rev=526142&r1=526141&r2=526142
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp Fri Apr 6 05:48:38 2007
@@ -18,3 +18,132 @@
#include "ThreadPoolTest.h"
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::concurrent::ThreadPoolTest );
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test1()
+{
+ CountDownLatch myLatch( 3 );
+ this->latch = &myLatch;
+
+ MyTask task1( 1 );
+ MyTask task2( 2 );
+ MyTask task3( 3 );
+
+ this->complete = 0;
+ this->tasksToComplete = 3;
+
+ ThreadPool* pool = ThreadPool::getInstance();
+
+ pool->queueTask( ThreadPool::Task( &task1, this ) );
+ pool->queueTask( ThreadPool::Task( &task2, this ) );
+ pool->queueTask( ThreadPool::Task( &task3, this ) );
+
+ // Wait for them to finish, if we can't do this in 30 seconds then
+ // there's probably something really wrong.
+ myLatch.await( 30000 );
+
+ CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+
+ CPPUNIT_ASSERT( task1.value == 101 );
+ CPPUNIT_ASSERT( task2.value == 102 );
+ CPPUNIT_ASSERT( task3.value == 103 );
+
+ CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
+ CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+
+ CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+ CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+
+ pool->setMaxThreads(50);
+ pool->setBlockSize(50);
+
+ CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
+ CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+
+ // Give it a little time to create all those threads.
+ for( int i = 0; i < 1000; ++i ) {
+ if( pool->getFreeThreadCount() == pool->getPoolSize() ) {
+ break;
+ }
+
+ Thread::sleep( 100 );
+ }
+
+ CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
+ CPPUNIT_ASSERT( this->caughtEx == false );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test2() {
+
+ try
+ {
+ ThreadPool pool;
+ Mutex myMutex;
+
+ CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+ CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+ pool.setMaxThreads(3);
+ pool.setBlockSize(1);
+ CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
+ CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
+ CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
+ pool.reserve( 4 );
+ CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
+ CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
+
+ CountDownLatch startedLatch1(3); // First three should go right away
+ CountDownLatch startedLatch2(1); // The fourth one goes after others finish
+ CountDownLatch doneLatch(4); // All should be done when we are at the end.
+
+ this->latch = &doneLatch;
+
+ MyWaitingTask task1( &myMutex, &startedLatch1 );
+ MyWaitingTask task2( &myMutex, &startedLatch1 );
+ MyWaitingTask task3( &myMutex, &startedLatch1 );
+ MyWaitingTask task4( &myMutex, &startedLatch2 );
+
+ this->complete = 0;
+ this->tasksToComplete = 4;
+
+ pool.queueTask( ThreadPool::Task( &task1, this ) );
+ pool.queueTask( ThreadPool::Task( &task2, this ) );
+ pool.queueTask( ThreadPool::Task( &task3, this ) );
+ pool.queueTask( ThreadPool::Task( &task4, this ) );
+
+ // Wait 30 seconds, then we let it fail because something is
+ // probably very wrong.
+ startedLatch1.await( 30000 );
+
+ CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
+ CPPUNIT_ASSERT( pool.getBacklog() == 1 );
+
+ // Wake up the tasks.
+ synchronized(&myMutex) {
+ myMutex.notifyAll();
+ }
+
+ // Wait 30 seconds, then we let it fail because something is
+ // probably very wrong.
+ startedLatch2.await( 30000 );
+
+ // Wake up the last task.
+ synchronized(&myMutex) {
+ myMutex.notifyAll();
+ }
+
+ // Wait for them to finish, if it takes longer than 30 seconds
+ // something is not right.
+ doneLatch.await( 30000 );
+
+ CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+ CPPUNIT_ASSERT( this->caughtEx == false );
+ }
+ catch( exceptions::ActiveMQException& ex ) {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h?view=diff&rev=526142&r1=526141&r2=526142
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h Fri Apr 6 05:48:38 2007
@@ -21,6 +21,7 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
+#include <activemq/concurrent/CountDownLatch.h>
#include <activemq/concurrent/Concurrent.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/ThreadPool.h>
@@ -31,221 +32,104 @@
namespace activemq{
namespace concurrent{
- class ThreadPoolTest :
- public CppUnit::TestFixture,
- public TaskListener
- {
- CPPUNIT_TEST_SUITE( ThreadPoolTest );
- CPPUNIT_TEST( test1 );
- CPPUNIT_TEST( test2 );
- CPPUNIT_TEST_SUITE_END();
-
- int tasksToComplete;
- int complete;
- Mutex mutex;
- Mutex completeMutex;
- bool caughtEx;
-
- public:
-
- ThreadPoolTest()
- {
- complete = 0;
- tasksToComplete = 0;
- caughtEx = false;
- }
-
- virtual ~ThreadPoolTest() {};
-
- virtual void onTaskComplete(Runnable* task AMQCPP_UNUSED)
- {
- try{
- synchronized(&mutex)
- {
- complete++;
+ class ThreadPoolTest :
+ public CppUnit::TestFixture,
+ public TaskListener
+ {
+ CPPUNIT_TEST_SUITE( ThreadPoolTest );
+ CPPUNIT_TEST( test1 );
+ CPPUNIT_TEST( test2 );
+ CPPUNIT_TEST_SUITE_END();
+
+ int tasksToComplete;
+ int complete;
+ Mutex mutex;
+ bool caughtEx;
+ CountDownLatch* latch;
- if(tasksToComplete == complete)
- {
- mutex.notifyAll();
- }
- }
- }catch( exceptions::ActiveMQException& ex ){
- ex.setMark( __FILE__, __LINE__ );
- }
- }
-
- virtual void onTaskException(Runnable* task AMQCPP_UNUSED,
- exceptions::ActiveMQException& ex AMQCPP_UNUSED)
- {
- caughtEx = true;
- }
-
- public:
-
- class MyTask : public Runnable
- {
- public:
-
- int value;
-
- MyTask(int x)
- {
- value = x;
- }
-
- virtual ~MyTask() {};
-
- virtual void run(void)
- {
- value += 100;
- }
- };
-
- class MyWaitingTask : public Runnable
- {
- public:
-
- Mutex* mutex;
- Mutex* complete;
-
- MyWaitingTask(Mutex* mutex, Mutex* complete)
- {
- this->mutex = mutex;
- this->complete = complete;
- }
-
- virtual ~MyWaitingTask() {};
-
- virtual void run(void)
- {
- try
- {
- synchronized(mutex)
- {
- mutex->wait();
- }
-
- synchronized(complete)
- {
- complete->notify();
- }
- }
- catch( exceptions::ActiveMQException& ex )
- {
- ex.setMark( __FILE__, __LINE__ );
- }
- }
- };
-
- public:
-
- void test2()
- {
- try
- {
- ThreadPool pool;
- Mutex myMutex;
-
- CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
- CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
- pool.setMaxThreads(3);
- pool.setBlockSize(1);
- CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
- CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
- CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
- pool.reserve( 4 );
- CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
- CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
-
- MyWaitingTask task1(&myMutex, &completeMutex);
- MyWaitingTask task2(&myMutex, &completeMutex);
- MyWaitingTask task3(&myMutex, &completeMutex);
- MyWaitingTask task4(&myMutex, &completeMutex);
+ public:
+ ThreadPoolTest() {
complete = 0;
- tasksToComplete = 4;
-
- pool.queueTask(ThreadPool::Task(&task1, this));
- pool.queueTask(ThreadPool::Task(&task2, this));
- pool.queueTask(ThreadPool::Task(&task3, this));
- pool.queueTask(ThreadPool::Task(&task4, this));
-
- Thread::sleep( 1000 );
-
- CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
- CPPUNIT_ASSERT( pool.getBacklog() == 1 );
-
- int count = 0;
- while(complete != tasksToComplete && count < 100)
- {
- synchronized(&myMutex)
- {
- myMutex.notifyAll();
- }
-
- synchronized(&completeMutex)
- {
- completeMutex.wait(1000);
- }
+ tasksToComplete = 0;
+ caughtEx = false;
+ latch = NULL;
+ }
- count++;
- }
+ virtual ~ThreadPoolTest() {}
- CPPUNIT_ASSERT( complete == tasksToComplete );
- CPPUNIT_ASSERT( caughtEx == false );
- }
- catch( exceptions::ActiveMQException& ex )
- {
- ex.setMark( __FILE__, __LINE__ );
- }
- }
+ virtual void onTaskComplete(Runnable* task AMQCPP_UNUSED)
+ {
+ try{
- void test1()
- {
- MyTask task1(1);
- MyTask task2(2);
- MyTask task3(3);
+ complete++;
- complete = 0;
- tasksToComplete = 3;
+ if( latch != NULL ) {
+ latch->countDown();
+ }
+ }catch( exceptions::ActiveMQException& ex ){
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
- ThreadPool* pool = ThreadPool::getInstance();
+ virtual void onTaskException(Runnable* task AMQCPP_UNUSED,
+ exceptions::ActiveMQException& ex AMQCPP_UNUSED) {
+ caughtEx = true;
+ }
- // Can't check this here since one of the other tests might
- // have used the global thread pool.
- // CPPUNIT_ASSERT( pool->getPoolSize() == 0 );
+ public:
- pool->queueTask(ThreadPool::Task(&task1, this));
- pool->queueTask(ThreadPool::Task(&task2, this));
- pool->queueTask(ThreadPool::Task(&task3, this));
+ class MyTask : public Runnable
+ {
+ public:
- Thread::sleep(1000);
+ int value;
- CPPUNIT_ASSERT( complete == tasksToComplete );
+ MyTask( int x ) {
+ value = x;
+ }
- CPPUNIT_ASSERT( task1.value == 101 );
- CPPUNIT_ASSERT( task2.value == 102 );
- CPPUNIT_ASSERT( task3.value == 103 );
+ virtual ~MyTask() {};
- CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
- CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+ virtual void run(void) {
+ value += 100;
+ }
+ };
- CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
- CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+ class MyWaitingTask : public Runnable
+ {
+ public:
+
+ Mutex* mutex;
+ CountDownLatch* startedLatch;
+
+ MyWaitingTask( Mutex* mutex, CountDownLatch* startedLatch ) {
+ this->mutex = mutex;
+ this->startedLatch = startedLatch;
+ }
- pool->setMaxThreads(50);
- pool->setBlockSize(50);
+ virtual ~MyWaitingTask() {};
- CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
- CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+ virtual void run(void) {
+ try
+ {
+ synchronized(mutex) {
+ startedLatch->countDown();
+ mutex->wait();
+ }
+ }
+ catch( exceptions::ActiveMQException& ex ) {
+ ex.setMark( __FILE__, __LINE__ );
+ }
+ }
+ };
- Thread::sleep(1000);
+ public:
- CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
- CPPUNIT_ASSERT( caughtEx == false );
+ virtual void test1();
+ virtual void test2();
- }
- };
+ };
}}