You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/31 23:02:03 UTC

activemq-artemis git commit: ARTEMIS-1078 Improving ActiveMQThreadPoolExecutor

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 05ca44d90 -> 5a31e7035


ARTEMIS-1078 Improving ActiveMQThreadPoolExecutor

This is now considering only threads waiting for the queue to get new tasks as idle.

The thread pool maintained a counter of active threads, but that counter was increased
too late in the beforeExecute method. Submitting a task created a new thread.
If now a second task was submitter before the new thread had started to execute it's task,
the second task was queued without creating a 2nd thread. So the second task was only
executed after the first task had been completed - even if the thread pool's
maximum number of thread had not been reached.

This fix now maintains the delta between the number those threads that are currently waiting
in the queue's poll or take methods as idle threads, and the number of queued tasks.
It creates new threads unless there are enough idle threads to pick up all queued tasks.

This closes #1144


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5a31e703
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5a31e703
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5a31e703

Branch: refs/heads/master
Commit: 5a31e7035354516664087a65ac97e1a6f4c13f14
Parents: 05ca44d
Author: Bernd Gutjahr <be...@hpe.com>
Authored: Tue Mar 28 15:54:58 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 31 19:01:54 2017 -0400

----------------------------------------------------------------------
 .../utils/ActiveMQThreadPoolExecutor.java       | 120 ++++++++++++++-----
 1 file changed, 91 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5a31e703/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index a87b18a..c3b1988 100755
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -20,7 +20,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /*
  * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@@ -47,31 +46,110 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
 
       private ActiveMQThreadPoolExecutor executor = null;
 
+      // lock object to synchronize on
+      private final Object lock = new Object();
+
+      // keep track of the difference between the number of idle threads and
+      // the number of queued tasks. If the delta is > 0, we have more
+      // idle threads than queued tasks and can add more tasks into the queue.
+      // The delta is incremented if a thread becomes idle or if a task is taken from the queue.
+      // The delta is decremented if a thread leaves idle state or if a task is added to the queue.
+      private int threadTaskDelta = 0;
+
       public void setExecutor(ActiveMQThreadPoolExecutor executor) {
          this.executor = executor;
       }
 
       @Override
       public boolean offer(Runnable runnable) {
-         int poolSize = executor.getPoolSize();
+         boolean retval = false;
+
+         // Need to lock for 2 reasons:
+         // 1. to safely handle poll timeouts
+         // 2. to protect the delta from parallel updates
+         synchronized (lock) {
+            if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) {
+               // A new task will be added to the queue if the maximum number of threads has been reached
+               // or if the delta is > 0, which means that there are enough idle threads.
+
+               retval = super.offer(runnable);
+
+               // Only decrement the delta if the task has actually been added to the queue
+               if (retval)
+                  threadTaskDelta--;
+            }
+         }
 
-         // If the are less threads than the configured maximum, then the tasks is
-         // only queued if there are some idle threads that can run that tasks.
-         // We have to add the queue size, since some tasks might just have been queued
-         // but not yet taken by an idle thread.
-         if (poolSize < executor.getMaximumPoolSize() && (size() + executor.getActive()) >= poolSize)
-            return false;
+         return retval;
+      }
 
-         return super.offer(runnable);
+      @Override
+      public Runnable take() throws InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to take from the queue
+         synchronized (lock) {
+            threadTaskDelta++;
+         }
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.take();
+            return runnable;
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            // Only if no task had been taken, we have to decrement the delta.
+            if (runnable == null) {
+               synchronized (lock) {
+                  threadTaskDelta--;
+               }
+            }
+         }
+      }
+
+      @Override
+      public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to poll from the queue
+         synchronized (lock) {
+            threadTaskDelta++;
+         }
+
+         Runnable runnable = null;
+         boolean timedOut = false;
+
+         try {
+            runnable = super.poll(arg0, arg2);
+            timedOut = (runnable == null);
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            if (runnable == null) {
+               synchronized (lock) {
+                  // If the poll called timed out, we check again within a synchronized block
+                  // to make sure all offer calls have been completed.
+                  // This is to handle a newly queued task if the timeout occurred while an offer call
+                  // added that task to the queue instead of creating a new thread.
+                  if (timedOut)
+                     runnable = super.poll();
+
+                  // Only if no task had been taken (either no timeout, or no task from after-timeout poll),
+                  // we have to decrement the delta.
+                  if (runnable == null)
+                     threadTaskDelta--;
+               }
+            }
+         }
+
+         return runnable;
       }
    }
 
    private int maxPoolSize;
 
-   // count the active threads with before-/afterExecute, since the .getActiveCount is not very
-   // efficient.
-   private final AtomicInteger active = new AtomicInteger(0);
-
    public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) {
       this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
    }
@@ -88,10 +166,6 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
       myQueue.setExecutor(this);
    }
 
-   private int getActive() {
-      return active.get();
-   }
-
    @Override
    public int getMaximumPoolSize() {
       return maxPoolSize;
@@ -101,16 +175,4 @@ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
    public void setMaximumPoolSize(int maxSize) {
       maxPoolSize = maxSize;
    }
-
-   @Override
-   protected void beforeExecute(Thread thread, Runnable runnable) {
-      super.beforeExecute(thread, runnable);
-      active.incrementAndGet();
-   }
-
-   @Override
-   protected void afterExecute(Runnable runnable, Throwable throwable) {
-      active.decrementAndGet();
-      super.afterExecute(runnable, throwable);
-   }
 }