You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/06 21:26:32 UTC

svn commit: r1089595 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/ test/decaf/util/concurrent/

Author: tabish
Date: Wed Apr  6 19:26:32 2011
New Revision: 1089595

URL: http://svn.apache.org/viewvc?rev=1089595&view=rev
Log:
Additional improvements to ThreadPoolExecutor and added tests.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Wed Apr  6 19:26:32 2011
@@ -49,6 +49,8 @@ namespace concurrent{
     class ExecutorKernel {
     public:
 
+        ThreadPoolExecutor* parent;
+
         LinkedList<Worker*> workers;
         LinkedList<Worker*> deadWorkers;
         Timer cleanupTimer;
@@ -65,9 +67,13 @@ namespace concurrent{
         Mutex mainLock;
         CountDownLatch termination;
 
+        long long completedTasks;
+        int largestPoolSize;
+
     public:
 
-        ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+        ExecutorKernel(ThreadPoolExecutor* parent,
+                       int corePoolSize, int maxPoolSize, long long keepAliveTime,
                        BlockingQueue<decaf::lang::Runnable*>* workQueue);
 
         ~ExecutorKernel();
@@ -92,6 +98,8 @@ namespace concurrent{
 
         void handleWorkerExit(Worker* worker);
 
+        void tryTerminate();
+
     };
 
     class Worker : public lang::Thread {
@@ -208,7 +216,7 @@ namespace concurrent{
 ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize,
                                        long long keepAliveTime, const TimeUnit& unit,
                                        BlockingQueue<decaf::lang::Runnable*>* workQueue) :
-    kernel(new ExecutorKernel(corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
+    kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -260,7 +268,12 @@ bool ThreadPoolExecutor::awaitTerminatio
 
 ////////////////////////////////////////////////////////////////////////////////
 int ThreadPoolExecutor::getPoolSize() const {
-    return (int)this->kernel->workers.size();
+    int result = 0;
+    synchronized(&this->kernel->mainLock) {
+        result = this->kernel->workers.size();
+    }
+
+    return result;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -279,6 +292,39 @@ long long ThreadPoolExecutor::getTaskCou
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getActiveCount() const {
+
+    int result = 0;
+    synchronized(&this->kernel->mainLock) {
+        if(!this->kernel->terminated.get()) {
+            result = this->kernel->workers.size() - this->kernel->freeThreads.get();
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ThreadPoolExecutor::getCompletedTaskCount() const {
+    long long result = 0;
+    synchronized(&this->kernel->mainLock) {
+        result = this->kernel->completedTasks;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ThreadPoolExecutor::getLargestPoolSize() const {
+    int result = 0;
+    synchronized(&this->kernel->mainLock) {
+        result = this->kernel->largestPoolSize;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ThreadPoolExecutor::isShutdown() const {
     return this->kernel->stopped.get();
 }
@@ -293,8 +339,10 @@ void ThreadPoolExecutor::terminated() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime,
+ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize,
+                               int maxPoolSize, long long keepAliveTime,
                                BlockingQueue<decaf::lang::Runnable*>* workQueue) :
+    parent(parent),
     workers(),
     deadWorkers(),
     cleanupTimer(),
@@ -307,7 +355,9 @@ ExecutorKernel::ExecutorKernel(int coreP
     keepAliveTime(keepAliveTime),
     workQueue(workQueue),
     mainLock(),
-    termination(1) {
+    termination(1),
+    completedTasks(0),
+    largestPoolSize(0) {
 
     if(corePoolSize < 0 || maxPoolSize <= 0 ||
        maxPoolSize < corePoolSize || keepAliveTime < 0) {
@@ -326,7 +376,9 @@ ExecutorKernel::ExecutorKernel(int coreP
 ////////////////////////////////////////////////////////////////////////////////
 ExecutorKernel::~ExecutorKernel() {
     try{
+
         this->shutdown();
+        this->tryTerminate();
 
         this->termination.await();
 
@@ -376,6 +428,7 @@ void ExecutorKernel::onTaskCompleted(Wor
     try {
         synchronized(&mainLock) {
             freeThreads.incrementAndGet();
+            completedTasks++;
         }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
@@ -400,9 +453,7 @@ void ExecutorKernel::handleWorkerExit(Wo
         this->deadWorkers.add(worker);
 
         if(this->workers.isEmpty()) {
-
-            // TODO - Notify ThreadPoolExecutor to call terminated()
-
+            this->parent->terminated();
             this->terminated = true;
             this->termination.countDown();
         }
@@ -485,6 +536,7 @@ void ExecutorKernel::AllocateThread() {
             this->workers.add(newWorker);
             freeThreads.incrementAndGet();
             newWorker->start();
+            this->largestPoolSize++;
         }
     }
     DECAF_CATCH_RETHROW( lang::Exception )
@@ -537,3 +589,25 @@ bool ExecutorKernel::awaitTermination(lo
 
     return this->termination.await(timeout, unit);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ExecutorKernel::tryTerminate() {
+
+    if (!this->isStoppedOrStopping() || (this->isStoppedOrStopping() && !this->workQueue->isEmpty())) {
+        return;
+    }
+
+    if (this->workers.size() > 0) {
+        // TODO - Once they are interruptible wake a worker.
+        return;
+    }
+
+    synchronized(&this->mainLock) {
+        try {
+            this->parent->terminated();
+        } catch(...) {}
+
+        this->terminated.set(true);
+        this->termination.countDown();
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Wed Apr  6 19:26:32 2011
@@ -58,6 +58,7 @@ namespace concurrent{
 
     private:
 
+        friend class ExecutorKernel;
         ExecutorKernel* kernel;
 
     public:
@@ -165,6 +166,30 @@ namespace concurrent{
         virtual long long getTaskCount() const;
 
         /**
+         * Returns an approximation of the number of threads that are currently running
+         * tasks for this executor.  This value can change rapidly.
+         *
+         * @return the number of currently active threads.
+         */
+        virtual int getActiveCount() const;
+
+        /**
+         * Returns the approximate number of Tasks that have been completed by this
+         * Executor, this value never decreases.
+         *
+         * @return the number of completed tasks since creation of the Executor.
+         */
+        virtual long long getCompletedTaskCount() const;
+
+        /**
+         * Returns the most Threads that have ever been active at one time within this
+         * Executors Thread pool.
+         *
+         * @return the largest number of threads ever to coexist in this executor.
+         */
+        virtual int getLargestPoolSize() const;
+
+        /**
          * Returns whether this executor has been shutdown or not.
          *
          * @return true if this executor has been shutdown.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Wed Apr  6 19:26:32 2011
@@ -95,6 +95,17 @@ namespace {
 }
 
 ///////////////////////////////////////////////////////////////////////////////
+void ThreadPoolExecutorTest::testConstructor1() {
+
+    ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+
+    CPPUNIT_ASSERT_EQUAL(1, pool.getCorePoolSize());
+    CPPUNIT_ASSERT_EQUAL(3, pool.getMaximumPoolSize());
+    CPPUNIT_ASSERT_EQUAL(false, pool.isShutdown());
+    CPPUNIT_ASSERT_EQUAL(false, pool.isTerminated());
+}
+
+///////////////////////////////////////////////////////////////////////////////
 void ThreadPoolExecutorTest::testSimpleTasks()
 {
     CountDownLatch myLatch( 3 );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1089595&r1=1089594&r2=1089595&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Wed Apr  6 19:26:32 2011
@@ -36,6 +36,7 @@ namespace concurrent{
     private:
 
         CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest );
+        CPPUNIT_TEST( testConstructor1 );
         CPPUNIT_TEST( testSimpleTasks );
         CPPUNIT_TEST( testMoreTasksThanMaxPoolSize );
         CPPUNIT_TEST( testTasksThatThrow );
@@ -51,6 +52,7 @@ namespace concurrent{
         ThreadPoolExecutorTest() {}
         virtual ~ThreadPoolExecutorTest() {}
 
+        void testConstructor1();
         void testSimpleTasks();
         void testMoreTasksThanMaxPoolSize();
         void testTasksThatThrow();