You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:59 UTC

[7/7] activemq-artemis git commit: ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow

ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0fadc68c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0fadc68c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0fadc68c

Branch: refs/heads/master
Commit: 0fadc68ca503eb35d75ac95292cd85339dc8b017
Parents: 3c5b57f
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Nov 8 12:03:49 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ProcessorBase.java     | 72 +++++++++++++-------
 1 file changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0fadc68c/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 8d19c22..73dbf2f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -21,11 +21,13 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 
 public abstract class ProcessorBase<T> {
 
    private static final int STATE_NOT_RUNNING = 0;
    private static final int STATE_RUNNING = 1;
+   private static final int STATE_FORCED_SHUTDOWN = 2;
 
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
@@ -33,12 +35,11 @@ public abstract class ProcessorBase<T> {
 
    private final ExecutorTask task = new ExecutorTask();
 
-   private final Object startedGuard = new Object();
-   private volatile boolean started = true;
-
    // used by stateUpdater
    @SuppressWarnings("unused")
-   private volatile int state = 0;
+   private volatile int state = STATE_NOT_RUNNING;
+
+   private volatile boolean requestedShutdown = false;
 
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
@@ -47,26 +48,22 @@ public abstract class ProcessorBase<T> {
       @Override
       public void run() {
          do {
-            //if there is no thread active then we run
+            //if there is no thread active and is not already dead then we run
             if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
-               T task = tasks.poll();
-               //while the queue is not empty we process in order
-
-               // All we care on started, is that a current task is not running as we call shutdown.
-               // for that reason this first run doesn't need to be under any lock
-               while (task != null && started) {
-
-                  // Synchronized here is just to guarantee that a current task is finished before
-                  // the started update can be taken as false
-                  synchronized (startedGuard) {
-                     if (started) {
+               try {
+                  T task = tasks.poll();
+                  //while the queue is not empty we process in order
+                  while (task != null) {
+                     //just drain the tasks if has been requested a shutdown to help the shutdown process
+                     if (!requestedShutdown) {
                         doTask(task);
                      }
+                     task = tasks.poll();
                   }
-                  task = tasks.poll();
+               } finally {
+                  //set state back to not running.
+                  stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
                }
-               //set state back to not running.
-               stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
             } else {
                return;
             }
@@ -81,10 +78,28 @@ public abstract class ProcessorBase<T> {
    /** It will wait the current execution (if there is one) to finish
     *  but will not complete any further executions */
    public void shutdownNow() {
-      synchronized (startedGuard) {
-         started = false;
+      //alert anyone that has been requested (at least) an immediate shutdown
+      requestedShutdown = true;
+      //it could take a very long time depending on the current executing task
+      do {
+         //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
+         final int startState = stateUpdater.get(this);
+         if (startState == STATE_FORCED_SHUTDOWN) {
+            //another thread has completed a forced shutdown
+            return;
+         }
+         if (startState == STATE_RUNNING) {
+            //wait 100 ms to avoid burning CPU while waiting and
+            //give other threads a chance to make progress
+            LockSupport.parkNanos(100_000_000L);
+         }
       }
+      while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN));
+      //this could happen just one time: the forced shutdown state is the last one and
+      //can be set by just one caller.
+      //As noted on the execute method there is a small chance that some tasks would be enqueued
       tasks.clear();
+      //we can report the killed tasks somehow: ExecutorService do the same on shutdownNow
    }
 
    protected abstract void doTask(T task);
@@ -98,11 +113,18 @@ public abstract class ProcessorBase<T> {
    }
 
    protected void task(T command) {
-      // There is no need to verify the lock here.
-      // you can only turn of running once
-      if (started) {
+      if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
+         //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
          tasks.add(command);
-         startPoller();
+         //cache locally the state to avoid multiple volatile loads
+         final int state = stateUpdater.get(this);
+         if (state == STATE_FORCED_SHUTDOWN) {
+            //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
+            tasks.clear();
+         } else if (state == STATE_NOT_RUNNING) {
+            //startPoller could be deleted but is maintained because is inherited
+            delegate.execute(task);
+         }
       }
    }