You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:58 UTC
[6/7] activemq-artemis git commit: ARTEMIS-1495 Few perf improvements
to: - reduce volatile loads - allow method inlining for hot execution paths -
reduced pointers chasing due to inner classes uses
ARTEMIS-1495 Few perf improvements to:
- reduce volatile loads
- allow method inlining for hot execution paths
- reduced pointers chasing due to inner classes uses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33b3eb6f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33b3eb6f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33b3eb6f
Branch: refs/heads/master
Commit: 33b3eb6f095da4a21648c268c7a960e55f414ca3
Parents: 91db080
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Nov 9 11:26:21 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500
----------------------------------------------------------------------
.../artemis/utils/actors/ArtemisExecutor.java | 25 +++-
.../artemis/utils/actors/ProcessorBase.java | 135 +++++++++++--------
.../utils/actors/OrderedExecutorSanityTest.java | 4 +-
3 files changed, 99 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 8efb3d3..9903d65 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,11 +17,10 @@
package org.apache.activemq.artemis.utils.actors;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
public interface ArtemisExecutor extends Executor {
@@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor {
};
}
- /** It will wait the current execution (if there is one) to finish
- * but will not complete any further executions */
- default List<Runnable> shutdownNow() {
- return Collections.emptyList();
+ /**
+ * It will wait the current execution (if there is one) to finish
+ * but will not complete any further executions.
+ *
+ * @param onPendingTask it will be called for each pending task found
+ * @return the number of pending tasks that won't be executed
+ */
+ default int shutdownNow(Consumer<? super Runnable> onPendingTask) {
+ return 0;
+ }
+
+ /**
+ * It will wait the current execution (if there is one) to finish
+ * but will not complete any further executions
+ */
+ default int shutdownNow() {
+ return shutdownNow(t -> {
+ });
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 1c77a52..ff6d9a1 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,21 +17,19 @@
package org.apache.activemq.artemis.utils.actors;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
import org.jboss.logging.Logger;
public abstract class ProcessorBase<T> extends HandlerBase {
private static final Logger logger = Logger.getLogger(ProcessorBase.class);
-
public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2;
@@ -39,53 +37,50 @@ public abstract class ProcessorBase<T> extends HandlerBase {
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
private final Executor delegate;
-
- private final ExecutorTask task = new ExecutorTask();
+ /**
+ * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
+ * when accessing ProcessorBase.this fields/methods.
+ */
+ private final Runnable task = this::executePendingTasks;
// used by stateUpdater
@SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING;
-
+ // Request of forced shutdown
+ private volatile boolean requestedForcedShutdown = false;
+ // Request of educated shutdown:
private volatile boolean requestedShutdown = false;
- private volatile boolean started = true;
-
private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
- private final class ExecutorTask implements Runnable {
-
- @Override
- public void run() {
- do {
- //if there is no thread active and is not already dead then we run
- if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
- enter();
- try {
- T task = tasks.poll();
- //while the queue is not empty we process in order
- while (task != null && !requestedShutdown) {
- //just drain the tasks if has been requested a shutdown to help the shutdown process
- if (requestedShutdown) {
- tasks.add(task);
- break;
- }
- doTask(task);
- task = tasks.poll();
- }
- } finally {
- leave();
- //set state back to not running.
- stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING);
+ private void executePendingTasks() {
+ do {
+ //if there is no thread active and is not already dead then we run
+ if (stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+ enter();
+ try {
+ T task;
+ //while the queue is not empty we process in order:
+ //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
+ while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
+ doTask(task);
+ }
+ } finally {
+ leave();
+ //set state back to not running if possible: shutdownNow could be called by doTask(task).
+ //If a shutdown has happened there is no need to continue polling tasks
+ if (!stateUpdater.compareAndSet(this, STATE_RUNNING, STATE_NOT_RUNNING)) {
+ return;
}
- } else {
- return;
}
- //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
- //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
- //this check fixes the issue
+ } else {
+ return;
}
- while (!tasks.isEmpty());
+ //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
+ //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
+ //this check fixes the issue
}
+ while (!tasks.isEmpty() && !requestedShutdown);
}
/**
@@ -96,7 +91,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
public void shutdown(long timeout, TimeUnit unit) {
- started = false;
+ requestedShutdown = true;
if (!inHandler()) {
// if it's in handler.. we just return
@@ -108,10 +103,10 @@ public abstract class ProcessorBase<T> extends HandlerBase {
* It will wait the current execution (if there is one) to finish
* but will not complete any further executions
*/
- public List<T> shutdownNow() {
+ public int shutdownNow(Consumer<? super T> onPendingItem) {
//alert anyone that has been requested (at least) an immediate shutdown
+ requestedForcedShutdown = true;
requestedShutdown = true;
- started = false;
if (inHandler()) {
stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
@@ -121,7 +116,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//alert the ExecutorTask (if is running) to just drain the current backlog of tasks
final int startState = stateUpdater.get(this);
if (startState == STATE_FORCED_SHUTDOWN) {
- //another thread has completed a forced shutdown
+ //another thread has completed a forced shutdown: let it to manage the tasks cleanup
break;
}
if (startState == STATE_RUNNING) {
@@ -135,10 +130,16 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//can be set by just one caller.
//As noted on the execute method there is a small chance that some tasks would be enqueued
}
- ArrayList<T> returnList = new ArrayList<>(tasks);
- tasks.clear();
-
- return returnList;
+ int pendingItems = 0;
+ //there is a small chance that execute() could race with this cleanup: the lock allow an all-or-nothing behaviour between them
+ synchronized (tasks) {
+ T item;
+ while ((item = tasks.poll()) != null) {
+ onPendingItem.accept(item);
+ pendingItems++;
+ }
+ }
+ return pendingItems;
}
protected abstract void doTask(T task);
@@ -148,7 +149,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
public final boolean isFlushed() {
- return stateUpdater.get(this) == STATE_NOT_RUNNING;
+ return this.state == STATE_NOT_RUNNING;
}
/**
@@ -158,14 +159,14 @@ public abstract class ProcessorBase<T> extends HandlerBase {
* like in shutdown and failover situations.
*/
public final boolean flush(long timeout, TimeUnit unit) {
- if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+ if (this.state == STATE_NOT_RUNNING) {
// quick test, most of the time it will be empty anyways
return true;
}
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
try {
- while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
+ while (this.state == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
if (tasks.isEmpty()) {
return true;
@@ -177,23 +178,42 @@ public abstract class ProcessorBase<T> extends HandlerBase {
// ignored
}
- return stateUpdater.get(this) == STATE_NOT_RUNNING;
+ return this.state == STATE_NOT_RUNNING;
}
protected void task(T command) {
- if (!started) {
- logger.debug("Ordered executor has been shutdown at", new Exception("debug"));
+ if (requestedShutdown) {
+ logAddOnShutdown();
}
//The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks
tasks.add(command);
//cache locally the state to avoid multiple volatile loads
final int state = stateUpdater.get(this);
- if (state == STATE_FORCED_SHUTDOWN) {
- //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add
- tasks.clear();
- } else if (state == STATE_NOT_RUNNING) {
+ if (state != STATE_RUNNING) {
+ onAddedTaskIfNotRunning(state);
+ }
+ }
+
+ /**
+ * This has to be called on the assumption that state!=STATE_RUNNING.
+ * It is packed separately from {@link #task(Object)} just for performance reasons: it
+ * handles the uncommon execution cases for bursty scenarios i.e. the slowest execution path.
+ */
+ private void onAddedTaskIfNotRunning(int state) {
+ if (state == STATE_NOT_RUNNING) {
//startPoller could be deleted but is maintained because is inherited
delegate.execute(task);
+ } else if (state == STATE_FORCED_SHUTDOWN) {
+ //help the GC by draining any task just submitted: it helps to cover the case of a shutdownNow finished before tasks.add
+ synchronized (tasks) {
+ tasks.clear();
+ }
+ }
+ }
+
+ private static void logAddOnShutdown() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ordered executor has been gently shutdown at", new Exception("debug"));
}
}
@@ -208,7 +228,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
public final int status() {
- return stateUpdater.get(this);
+ //avoid using the updater because in older version of JDK 8 isn't optimized as a vanilla volatile get
+ return this.state;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 4e2bbba..345cbb5 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -82,7 +82,7 @@ public class OrderedExecutorSanityTest {
@Test
- public void shutdownWithin() throws InterruptedException {
+ public void shutdownNowOnDelegateExecutor() throws InterruptedException {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final OrderedExecutor executor = new OrderedExecutor(executorService);
@@ -93,7 +93,7 @@ public class OrderedExecutorSanityTest {
executor.execute(() -> {
try {
latch.await(1, TimeUnit.MINUTES);
- numberOfTasks.set(executor.shutdownNow().size());
+ numberOfTasks.set(executor.shutdownNow());
ran.countDown();
} catch (Exception e) {
e.printStackTrace();