You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:58 UTC

[6/7] activemq-artemis git commit: ARTEMIS-1495 Few perf improvements to: - reduce volatile loads - allow method inlining for hot execution paths - reduced pointers chasing due to inner classes uses

ARTEMIS-1495 Few perf improvements to:
 - reduce volatile loads
 - allow method inlining for hot execution paths
 - reduced pointers chasing due to inner classes uses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33b3eb6f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33b3eb6f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33b3eb6f

Branch: refs/heads/master
Commit: 33b3eb6f095da4a21648c268c7a960e55f414ca3
Parents: 91db080
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Nov 9 11:26:21 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   |  25 +++-
 .../artemis/utils/actors/ProcessorBase.java     | 135 +++++++++++--------
 .../utils/actors/OrderedExecutorSanityTest.java |   4 +-
 3 files changed, 99 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 8efb3d3..9903d65 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,11 +17,10 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public interface ArtemisExecutor extends Executor {
 
@@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor {
       };
    }
 
-   /** It will wait the current execution (if there is one) to finish
-    *  but will not complete any further executions */
-   default List<Runnable> shutdownNow() {
-      return Collections.emptyList();
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions.
+    *
+    * @param onPendingTask it will be called for each pending task found
+    * @return the number of pending tasks that won't be executed
+    */
+   default int shutdownNow(Consumer<? super Runnable> onPendingTask) {
+      return 0;
+   }
+
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions
+    */
+   default int shutdownNow() {
+      return shutdownNow(t -> {
+      });
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 1c77a52..ff6d9a1 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,21 +17,19 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
 
 import org.jboss.logging.Logger;
 
 public abstract class ProcessorBase<T> extends HandlerBase {
 
    private static final Logger logger = Logger.getLogger(ProcessorBase.class);
-
    public static final int STATE_NOT_RUNNING = 0;
    public static final int STATE_RUNNING = 1;
    public static final int STATE_FORCED_SHUTDOWN = 2;
@@ -39,53 +37,50 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
    private final Executor delegate;
-
-   private final ExecutorTask task = new ExecutorTask();
+   /**
+    * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
+    * when accessing ProcessorBase.this fields/methods.
+    */
+   private final Runnable task = this::executePendingTasks;
 
    // used by stateUpdater
    @SuppressWarnings("unused")
    private volatile int state = STATE_NOT_RUNNING;
-
+   // Request of forced shutdown
+   private volatile boolean requestedForcedShutdown = false;
+   // Request of educated shutdown:
    private volatile boolean requestedShutdown = false;
 
-   private volatile boolean started = true;
-
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
-   private final class ExecutorTask implements Runnable {
-
-      @Override
-      public void run() {
-         do {
-            //if there is no thread active and is not already dead then we run
-            if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
-               enter();
-               try {
-                  T task = tasks.poll();
-                  //while the queue is not empty we process in order
-                  while (task != null && !requestedShutdown) {
-                     //just drain the tasks if has been requested a shutdown to help the shutdown process
-                     if (requestedShutdown) {
-                        tasks.add(task);
-                        break;
-                     }
-                     doTask(task);
-                     task = tasks.poll();
-                  }
-               } finally {
-                  leave();
-                  //set state back to not running.
-                  stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
+   private void executePendingTasks() {
+      do {
+         //if there is no thread active and is not already dead then we run
+         if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+            enter();
+            try {
+               T task;
+               //while the queue is not empty we process in order:
+               //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
+               while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
+                  doTask(task);
+               }
+            } finally {
+               leave();
+               //set state back to not running if possible: shutdownNow could be called by doTask(task).
+               //If a shutdown has happened there is no need to continue polling tasks
+               if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) {
+                  return;
                }
-            } else {
-               return;
             }
-            //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
-            //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
-            //this check fixes the issue
+         } else {
+            return;
          }
-         while (!tasks.isEmpty());
+         //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
+         //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
+         //this check fixes the issue
       }
+      while (!tasks.isEmpty() && !requestedShutdown);
    }
 
    /**
@@ -96,7 +91,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public void shutdown(long timeout, TimeUnit unit) {
-      started = false;
+      requestedShutdown = true;
 
       if (!inHandler()) {
          // if it's in handler.. we just return
@@ -108,10 +103,10 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     * It will wait the current execution (if there is one) to finish
     * but will not complete any further executions
     */
-   public List<T> shutdownNow() {
+   public int shutdownNow(Consumer<? super T> onPendingItem) {
       //alert anyone that has been requested (at least) an immediate shutdown
+      requestedForcedShutdown = true;
       requestedShutdown = true;
-      started = false;
 
       if (inHandler()) {
          stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
@@ -121,7 +116,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
             //alert the ExecutorTask (if is running) to just drain the current backlog of tasks
             final int startState = stateUpdater.get(this);
             if (startState == STATE_FORCED_SHUTDOWN) {
-               //another thread has completed a forced shutdown
+               //another thread has completed a forced shutdown: let it to manage the tasks cleanup
                break;
             }
             if (startState == STATE_RUNNING) {
@@ -135,10 +130,16 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          //can be set by just one caller.
          //As noted on the execute method there is a small chance that some tasks would be enqueued
       }
-      ArrayList<T> returnList = new ArrayList<>(tasks);
-      tasks.clear();
-
-      return returnList;
+      int pendingItems = 0;
+      //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
+      synchronized (tasks) {
+         T item;
+         while ((item = tasks.poll()) != null) {
+            onPendingItem.accept(item);
+            pendingItems++;
+         }
+      }
+      return pendingItems;
    }
 
    protected abstract void doTask(T task);
@@ -148,7 +149,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public final boolean isFlushed() {
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
+      return this.state == STATE_NOT_RUNNING;
    }
 
    /**
@@ -158,14 +159,14 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     * like in shutdown and failover situations.
     */
    public final boolean flush(long timeout, TimeUnit unit) {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+      if (this.state == STATE_NOT_RUNNING) {
          // quick test, most of the time it will be empty anyways
          return true;
       }
 
       long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
       try {
-         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
+         while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
 
             if (tasks.isEmpty()) {
                return true;
@@ -177,23 +178,42 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          // ignored
       }
 
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
+      return this.state == STATE_NOT_RUNNING;
    }
 
    protected void task(T command) {
-      if (!started) {
-         logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
+      if (requestedShutdown) {
+         logAddOnShutdown();
       }
       //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
       tasks.add(command);
       //cache locally the state to avoid multiple volatile loads
       final int state = stateUpdater.get(this);
-      if (state == STATE_FORCED_SHUTDOWN) {
-         //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
-         tasks.clear();
-      } else if (state == STATE_NOT_RUNNING) {
+      if (state != STATE_RUNNING) {
+         onAddedTaskIfNotRunning(state);
+      }
+   }
+
+   /**
+    * This has to be called on the assumption that state!=STATE_RUNNING.
+    * It is packed separately from {@link #task(Object)} just for performance reasons: it
+    * handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
+    */
+   private void onAddedTaskIfNotRunning(int state) {
+      if (state == STATE_NOT_RUNNING) {
          //startPoller could be deleted but is maintained because is inherited
          delegate.execute(task);
+      } else if (state == STATE_FORCED_SHUTDOWN) {
+         //help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
+         synchronized (tasks) {
+            tasks.clear();
+         }
+      }
+   }
+
+   private static void logAddOnShutdown() {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Ordered executor has been gently shutdown at", new Exception("debug"));
       }
    }
 
@@ -208,7 +228,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public final int status() {
-      return stateUpdater.get(this);
+      //avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get
+      return this.state;
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 4e2bbba..345cbb5 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -82,7 +82,7 @@ public class OrderedExecutorSanityTest {
 
 
    @Test
-   public void shutdownWithin() throws InterruptedException {
+   public void shutdownNowOnDelegateExecutor() throws InterruptedException {
       final ExecutorService executorService = Executors.newSingleThreadExecutor();
       try {
          final OrderedExecutor executor = new OrderedExecutor(executorService);
@@ -93,7 +93,7 @@ public class OrderedExecutorSanityTest {
          executor.execute(() -> {
             try {
                latch.await(1, TimeUnit.MINUTES);
-               numberOfTasks.set(executor.shutdownNow().size());
+               numberOfTasks.set(executor.shutdownNow());
                ran.countDown();
             } catch (Exception e) {
                e.printStackTrace();