You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/28 13:53:21 UTC
[activemq-artemis] branch master updated: ARTEMIS-2926 Scheduled
task executions are skipped randomly
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new e2c1848 ARTEMIS-2926 Scheduled task executions are skipped randomly
new 4d6096f This closes #3287
e2c1848 is described below
commit e2c1848da4a7e44c09f58bbb4c7f6905b7642a7d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Oct 2 18:58:26 2020 +0200
ARTEMIS-2926 Scheduled task executions are skipped randomly
Making Scheduled task to be more reliable when
using scheduledComponent.delay() method and saving
periodic tasks to be skipped although on correct timing
---
.../core/server/ActiveMQScheduledComponent.java | 131 ++++++++++++---------
.../utils/ActiveMQScheduledComponentTest.java | 16 +++
2 files changed, 92 insertions(+), 55 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 2ccea9f..5ff56e5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -20,11 +20,12 @@ package org.apache.activemq.artemis.core.server;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
@@ -41,15 +42,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
- private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
- private volatile ScheduledFuture future;
+ private volatile boolean isStarted;
+ private ScheduledFuture future;
private final boolean onDemand;
-
- long lastTime = 0;
-
- private final AtomicInteger delayed = new AtomicInteger(0);
+ // The start/stop actions shouldn't interact concurrently with delay so it doesn't need to be volatile
+ private AtomicBoolean bookedForRunning;
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
@@ -73,6 +72,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
+ this.bookedForRunning = new AtomicBoolean(false);
+ this.isStarted = false;
}
/**
@@ -89,12 +90,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
- this.executor = null;
- this.scheduledExecutorService = scheduledExecutorService;
- this.initialDelay = initialDelay;
- this.period = checkPeriod;
- this.timeUnit = timeUnit;
- this.onDemand = onDemand;
+ this(scheduledExecutorService, null, initialDelay, checkPeriod, timeUnit, onDemand);
}
/**
@@ -150,11 +146,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
@Override
public synchronized void start() {
- if (future != null) {
+ if (isStarted) {
// already started
return;
}
-
+ isStarted = true;
if (scheduledExecutorService == null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
startedOwnScheduler = true;
@@ -165,10 +161,9 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
return;
}
- this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
-
if (period >= 0) {
- future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
+ final AtomicBoolean booked = this.bookedForRunning;
+ future = scheduledExecutorService.scheduleWithFixedDelay(() -> runForScheduler(booked), initialDelay >= 0 ? initialDelay : period, period, timeUnit);
} else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
@@ -188,15 +183,26 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
- public void delay() {
- int value = delayed.incrementAndGet();
- if (value > 10) {
- delayed.decrementAndGet();
- } else {
- // We only schedule up to 10 periods upfront.
- // this is to avoid a window where a current one would be running and a next one is coming.
- // in theory just 2 would be enough. I'm using 10 as a precaution here.
- scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
+ /**
+ * A delay request can succeed only if:
+ * <ul>
+ * <li>there is no other pending delay request
+ * <li>there is no pending execution request
+ * </ul>
+ * <p>
+ * When a delay request succeed it schedule a new execution to happen in {@link #getPeriod()}.<br>
+ */
+ public boolean delay() {
+ final AtomicBoolean booked = this.bookedForRunning;
+ if (!booked.compareAndSet(false, true)) {
+ return false;
+ }
+ try {
+ scheduledExecutorService.schedule(() -> bookedRunForScheduler(booked), period, timeUnit);
+ return true;
+ } catch (RejectedExecutionException e) {
+ booked.set(false);
+ throw e;
}
}
@@ -261,7 +267,14 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
- public void stop() {
+ public synchronized void stop() {
+ if (!isStarted) {
+ return;
+ }
+ isStarted = false;
+ // Replace the existing one: a new periodic task or any new delay after stop
+ // won't interact with the previously running ones
+ this.bookedForRunning = new AtomicBoolean(false);
if (future != null) {
future.cancel(false);
future = null;
@@ -275,8 +288,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
- public synchronized boolean isStarted() {
- return future != null;
+ public boolean isStarted() {
+ return isStarted;
}
// this will restart the scheduled component upon changes
@@ -287,35 +300,43 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
}
- final Runnable runForExecutor = new Runnable() {
- @Override
- public void run() {
- if (onDemand && delayed.get() > 0) {
- delayed.decrementAndGet();
- }
+ private void runForExecutor(AtomicBoolean booked) {
+ // It unblocks:
+ // - a new delay request
+ // - next periodic run request (in case of executor != null)
+ // Although tempting, don't move this one after ActiveMQScheduledComponent.this.run():
+ // - it can cause "delay" to change semantic ie a racing delay while finished executing the task, won't succeed
+ // - it won't prevent "slow tasks" to accumulate, because slowness cannot be measured inside running method;
+ // it just cause skipping runs for perfectly timed executions too
+ boolean alwaysTrue = booked.compareAndSet(true, false);
+ assert alwaysTrue;
+ ActiveMQScheduledComponent.this.run();
+ }
- if (!onDemand && lastTime > 0) {
- if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
- logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
- return;
+ private void bookedRunForScheduler(AtomicBoolean booked) {
+ assert booked.get();
+ if (executor != null) {
+ try {
+ executor.execute(() -> runForExecutor(booked));
+ } catch (RejectedExecutionException e) {
+ if (booked != null) {
+ booked.set(false);
}
+ throw e;
}
-
- lastTime = System.currentTimeMillis();
-
- ActiveMQScheduledComponent.this.run();
+ } else {
+ runForExecutor(booked);
}
- };
-
- final Runnable runForScheduler = new Runnable() {
- @Override
- public void run() {
- if (executor != null) {
- executor.execute(runForExecutor);
- } else {
- runForExecutor.run();
- }
+ }
+
+ private void runForScheduler(AtomicBoolean booked) {
+ if (!booked.compareAndSet(false, true)) {
+ // let's skip this execution because there is:
+ // - a previously submitted period task yet to start -> executor is probably overbooked!
+ // - a pending delay request
+ return;
}
- };
+ bookedRunForScheduler(booked);
+ }
}
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
index aa67582..48e0711 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -79,6 +79,22 @@ public class ActiveMQScheduledComponentTest {
}
@Test
+ public void testSubMillisDelay() throws InterruptedException {
+ final CountDownLatch triggered = new CountDownLatch(2);
+ final long nsInterval = TimeUnit.MICROSECONDS.toNanos(900);
+ final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, nsInterval, TimeUnit.NANOSECONDS, false) {
+
+ @Override
+ public void run() {
+ triggered.countDown();
+ }
+ };
+ local.start();
+ Assert.assertTrue(triggered.await(10, TimeUnit.SECONDS));
+ local.stop();
+ }
+
+ @Test
public void testVerifyInitialDelayChanged() {
final long initialDelay = 10;
final long period = 100;