You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/04/06 14:48:39 UTC

svn commit: r526142 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent: ThreadPoolTest.cpp ThreadPoolTest.h

Author: tabish
Date: Fri Apr  6 05:48:38 2007
New Revision: 526142

URL: http://svn.apache.org/viewvc?view=rev&rev=526142
Log:
http://issues.apache.org/activemq/browse/AMQCPP-97

Used CountDownLatch instead of sleeps.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp?view=diff&rev=526142&r1=526141&r2=526142
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.cpp Fri Apr  6 05:48:38 2007
@@ -18,3 +18,132 @@
 #include "ThreadPoolTest.h"
 
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::concurrent::ThreadPoolTest );
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test1()
+{
+    CountDownLatch myLatch( 3 );
+    this->latch = &myLatch;
+
+    MyTask task1( 1 );
+    MyTask task2( 2 );
+    MyTask task3( 3 );
+
+    this->complete = 0;
+    this->tasksToComplete = 3;
+
+    ThreadPool* pool = ThreadPool::getInstance();
+
+    pool->queueTask( ThreadPool::Task( &task1, this ) );
+    pool->queueTask( ThreadPool::Task( &task2, this ) );
+    pool->queueTask( ThreadPool::Task( &task3, this ) );
+
+    // Wait for them to finish, if we can't do this in 30 seconds then
+    // there's probably something really wrong.
+    myLatch.await( 30000 );
+
+    CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+
+    CPPUNIT_ASSERT( task1.value == 101 );
+    CPPUNIT_ASSERT( task2.value == 102 );
+    CPPUNIT_ASSERT( task3.value == 103 );
+
+    CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
+    CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+
+    CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+    CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+
+    pool->setMaxThreads(50);
+    pool->setBlockSize(50);
+
+    CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
+    CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+
+    // Give it a little time to create all those threads.
+    for( int i = 0; i < 1000; ++i ) {
+        if( pool->getFreeThreadCount() == pool->getPoolSize() ) {
+            break;
+        }
+
+        Thread::sleep( 100 );
+    }
+
+    CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
+    CPPUNIT_ASSERT( this->caughtEx == false );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolTest::test2() {
+
+    try
+    {
+        ThreadPool pool;
+        Mutex myMutex;
+
+        CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
+        CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+        pool.setMaxThreads(3);
+        pool.setBlockSize(1);
+        CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
+        CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
+        CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
+        pool.reserve( 4 );
+        CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
+        CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
+
+        CountDownLatch startedLatch1(3);  // First three should go right away
+        CountDownLatch startedLatch2(1);  // The fourth one goes after others finish
+        CountDownLatch doneLatch(4);      // All should be done when we are at the end.
+
+        this->latch = &doneLatch;
+
+        MyWaitingTask task1( &myMutex, &startedLatch1 );
+        MyWaitingTask task2( &myMutex, &startedLatch1 );
+        MyWaitingTask task3( &myMutex, &startedLatch1 );
+        MyWaitingTask task4( &myMutex, &startedLatch2 );
+
+        this->complete = 0;
+        this->tasksToComplete = 4;
+
+        pool.queueTask( ThreadPool::Task( &task1, this ) );
+        pool.queueTask( ThreadPool::Task( &task2, this ) );
+        pool.queueTask( ThreadPool::Task( &task3, this ) );
+        pool.queueTask( ThreadPool::Task( &task4, this ) );
+
+        // Wait 30 seconds, then we let it fail because something is
+        // probably very wrong.
+        startedLatch1.await( 30000 );
+
+        CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
+        CPPUNIT_ASSERT( pool.getBacklog() == 1 );
+
+        // Wake up the tasks.
+        synchronized(&myMutex) {
+            myMutex.notifyAll();
+        }
+
+        // Wait 30 seconds, then we let it fail because something is
+        // probably very wrong.
+        startedLatch2.await( 30000 );
+
+        // Wake up the last task.
+        synchronized(&myMutex) {
+            myMutex.notifyAll();
+        }
+
+        // Wait for them to finish, if it takes longer than 30 seconds
+        // something is not right.
+        doneLatch.await( 30000 );
+
+        CPPUNIT_ASSERT( this->complete == this->tasksToComplete );
+        CPPUNIT_ASSERT( this->caughtEx == false );
+    }
+    catch( exceptions::ActiveMQException& ex ) {
+        ex.setMark( __FILE__, __LINE__ );
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h?view=diff&rev=526142&r1=526141&r2=526142
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/concurrent/ThreadPoolTest.h Fri Apr  6 05:48:38 2007
@@ -21,6 +21,7 @@
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 
+#include <activemq/concurrent/CountDownLatch.h>
 #include <activemq/concurrent/Concurrent.h>
 #include <activemq/concurrent/Thread.h>
 #include <activemq/concurrent/ThreadPool.h>
@@ -31,221 +32,104 @@
 namespace activemq{
 namespace concurrent{
 
-   class ThreadPoolTest :
-      public CppUnit::TestFixture,
-      public TaskListener
-   {
-      CPPUNIT_TEST_SUITE( ThreadPoolTest );
-      CPPUNIT_TEST( test1 );
-      CPPUNIT_TEST( test2 );
-      CPPUNIT_TEST_SUITE_END();
-
-      int tasksToComplete;
-      int complete;
-      Mutex mutex;
-      Mutex completeMutex;
-      bool caughtEx;
-
-   public:
-
-       ThreadPoolTest()
-      {
-         complete = 0;
-         tasksToComplete = 0;
-         caughtEx = false;
-      }
-
-       virtual ~ThreadPoolTest() {};
-
-      virtual void onTaskComplete(Runnable* task AMQCPP_UNUSED)
-      {
-        try{
-             synchronized(&mutex)
-             {
-                complete++;
+    class ThreadPoolTest :
+        public CppUnit::TestFixture,
+        public TaskListener
+    {
+        CPPUNIT_TEST_SUITE( ThreadPoolTest );
+        CPPUNIT_TEST( test1 );
+        CPPUNIT_TEST( test2 );
+        CPPUNIT_TEST_SUITE_END();
+
+        int tasksToComplete;
+        int complete;
+        Mutex mutex;
+        bool caughtEx;
+        CountDownLatch* latch;
 
-                if(tasksToComplete == complete)
-                {
-                   mutex.notifyAll();
-                }
-             }
-        }catch( exceptions::ActiveMQException& ex ){
-            ex.setMark( __FILE__, __LINE__ );
-        }
-      }
-
-      virtual void onTaskException(Runnable* task AMQCPP_UNUSED,
-        exceptions::ActiveMQException& ex AMQCPP_UNUSED)
-      {
-         caughtEx = true;
-      }
-
-   public:
-
-      class MyTask : public Runnable
-      {
-      public:
-
-         int value;
-
-         MyTask(int x)
-         {
-            value = x;
-         }
-
-         virtual ~MyTask() {};
-
-         virtual void run(void)
-         {
-            value += 100;
-         }
-      };
-
-      class MyWaitingTask : public Runnable
-      {
-      public:
-
-         Mutex* mutex;
-         Mutex* complete;
-
-         MyWaitingTask(Mutex* mutex, Mutex* complete)
-         {
-            this->mutex = mutex;
-            this->complete = complete;
-         }
-
-         virtual ~MyWaitingTask() {};
-
-         virtual void run(void)
-         {
-            try
-            {
-               synchronized(mutex)
-               {
-                  mutex->wait();
-               }
-
-               synchronized(complete)
-               {
-                   complete->notify();
-               }
-            }
-            catch( exceptions::ActiveMQException& ex )
-            {
-                ex.setMark( __FILE__, __LINE__ );
-            }
-         }
-      };
-
-   public:
-
-      void test2()
-      {
-         try
-         {
-            ThreadPool pool;
-            Mutex myMutex;
-
-            CPPUNIT_ASSERT( pool.getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
-            CPPUNIT_ASSERT( pool.getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
-            pool.setMaxThreads(3);
-            pool.setBlockSize(1);
-            CPPUNIT_ASSERT( pool.getMaxThreads() == 3 );
-            CPPUNIT_ASSERT( pool.getBlockSize() == 1 );
-            CPPUNIT_ASSERT( pool.getPoolSize() == 0 );
-            pool.reserve( 4 );
-            CPPUNIT_ASSERT( pool.getPoolSize() == 3 );
-            CPPUNIT_ASSERT( pool.getFreeThreadCount() == 3 );
-
-            MyWaitingTask task1(&myMutex, &completeMutex);
-            MyWaitingTask task2(&myMutex, &completeMutex);
-            MyWaitingTask task3(&myMutex, &completeMutex);
-            MyWaitingTask task4(&myMutex, &completeMutex);
+    public:
 
+        ThreadPoolTest() {
             complete = 0;
-            tasksToComplete = 4;
-
-            pool.queueTask(ThreadPool::Task(&task1, this));
-            pool.queueTask(ThreadPool::Task(&task2, this));
-            pool.queueTask(ThreadPool::Task(&task3, this));
-            pool.queueTask(ThreadPool::Task(&task4, this));
-
-            Thread::sleep( 1000 );
-
-            CPPUNIT_ASSERT( pool.getFreeThreadCount() == 0 );
-            CPPUNIT_ASSERT( pool.getBacklog() == 1 );
-
-            int count = 0;
-            while(complete != tasksToComplete && count < 100)
-            {
-               synchronized(&myMutex)
-               {
-                  myMutex.notifyAll();
-               }
-
-               synchronized(&completeMutex)
-               {
-                  completeMutex.wait(1000);
-               }
+            tasksToComplete = 0;
+            caughtEx = false;
+            latch = NULL;
+        }
 
-               count++;
-            }
+        virtual ~ThreadPoolTest() {}
 
-            CPPUNIT_ASSERT( complete == tasksToComplete );
-            CPPUNIT_ASSERT( caughtEx == false );
-         }
-         catch( exceptions::ActiveMQException& ex )
-         {
-            ex.setMark( __FILE__, __LINE__ );
-         }
-      }
+        virtual void onTaskComplete(Runnable* task AMQCPP_UNUSED)
+        {
+            try{
 
-      void test1()
-      {
-         MyTask task1(1);
-         MyTask task2(2);
-         MyTask task3(3);
+                complete++;
 
-         complete = 0;
-         tasksToComplete = 3;
+                if( latch != NULL ) {
+                    latch->countDown();
+                }
+            }catch( exceptions::ActiveMQException& ex ){
+                ex.setMark( __FILE__, __LINE__ );
+            }
+        }
 
-         ThreadPool* pool = ThreadPool::getInstance();
+        virtual void onTaskException(Runnable* task AMQCPP_UNUSED,
+            exceptions::ActiveMQException& ex AMQCPP_UNUSED) {
+            caughtEx = true;
+        }
 
-         // Can't check this here since one of the other tests might
-         // have used the global thread pool.
-         // CPPUNIT_ASSERT( pool->getPoolSize() == 0 );
+    public:
 
-         pool->queueTask(ThreadPool::Task(&task1, this));
-         pool->queueTask(ThreadPool::Task(&task2, this));
-         pool->queueTask(ThreadPool::Task(&task3, this));
+        class MyTask : public Runnable
+        {
+        public:
 
-         Thread::sleep(1000);
+            int value;
 
-         CPPUNIT_ASSERT( complete == tasksToComplete );
+            MyTask( int x ) {
+                value = x;
+            }
 
-         CPPUNIT_ASSERT( task1.value == 101 );
-         CPPUNIT_ASSERT( task2.value == 102 );
-         CPPUNIT_ASSERT( task3.value == 103 );
+            virtual ~MyTask() {};
 
-         CPPUNIT_ASSERT( pool->getPoolSize() > 0 );
-         CPPUNIT_ASSERT( pool->getBacklog() == 0 );
+            virtual void run(void) {
+                value += 100;
+            }
+        };
 
-         CPPUNIT_ASSERT( pool->getMaxThreads() == ThreadPool::DEFAULT_MAX_POOL_SIZE );
-         CPPUNIT_ASSERT( pool->getBlockSize() == ThreadPool::DEFAULT_MAX_BLOCK_SIZE );
+        class MyWaitingTask : public Runnable
+        {
+        public:
+
+            Mutex* mutex;
+            CountDownLatch* startedLatch;
+
+            MyWaitingTask( Mutex* mutex, CountDownLatch* startedLatch ) {
+                this->mutex = mutex;
+                this->startedLatch = startedLatch;
+            }
 
-         pool->setMaxThreads(50);
-         pool->setBlockSize(50);
+            virtual ~MyWaitingTask() {};
 
-         CPPUNIT_ASSERT( pool->getMaxThreads() == 50 );
-         CPPUNIT_ASSERT( pool->getBlockSize() == 50 );
+            virtual void run(void) {
+                try
+                {
+                    synchronized(mutex) {
+                        startedLatch->countDown();
+                        mutex->wait();
+                    }
+                }
+                catch( exceptions::ActiveMQException& ex ) {
+                    ex.setMark( __FILE__, __LINE__ );
+                }
+            }
+        };
 
-         Thread::sleep(1000);
+    public:
 
-         CPPUNIT_ASSERT( pool->getFreeThreadCount() == pool->getPoolSize() );
-         CPPUNIT_ASSERT( caughtEx == false );
+        virtual void test1();
+        virtual void test2();
 
-      }
-   };
+    };
 
 }}