You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/28 13:53:21 UTC

[activemq-artemis] branch master updated: ARTEMIS-2926 Scheduled task executions are skipped randomly

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e2c1848  ARTEMIS-2926 Scheduled task executions are skipped randomly
     new 4d6096f  This closes #3287
e2c1848 is described below

commit e2c1848da4a7e44c09f58bbb4c7f6905b7642a7d
Author: franz1981 <ni...@gmail.com>
AuthorDate: Fri Oct 2 18:58:26 2020 +0200

    ARTEMIS-2926 Scheduled task executions are skipped randomly
    
    Making Scheduled task to be more reliable when
    using scheduledComponent.delay() method and saving
    periodic tasks to be skipped although on correct timing
---
 .../core/server/ActiveMQScheduledComponent.java    | 131 ++++++++++++---------
 .../utils/ActiveMQScheduledComponentTest.java      |  16 +++
 2 files changed, 92 insertions(+), 55 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 2ccea9f..5ff56e5 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -20,11 +20,12 @@ package org.apache.activemq.artemis.core.server;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.jboss.logging.Logger;
@@ -41,15 +42,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
    /** initialDelay < 0 would mean no initial delay, use the period instead */
    private long initialDelay;
    private long period;
-   private long millisecondsPeriod;
    private TimeUnit timeUnit;
    private final Executor executor;
-   private volatile ScheduledFuture future;
+   private volatile boolean isStarted;
+   private ScheduledFuture future;
    private final boolean onDemand;
-
-   long lastTime = 0;
-
-   private final AtomicInteger delayed = new AtomicInteger(0);
+   // The start/stop actions shouldn't interact concurrently with delay so it doesn't need to be volatile
+   private AtomicBoolean bookedForRunning;
 
    /**
     * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
@@ -73,6 +72,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
       this.period = checkPeriod;
       this.timeUnit = timeUnit;
       this.onDemand = onDemand;
+      this.bookedForRunning = new AtomicBoolean(false);
+      this.isStarted = false;
    }
 
    /**
@@ -89,12 +90,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
                                      long checkPeriod,
                                      TimeUnit timeUnit,
                                      boolean onDemand) {
-      this.executor = null;
-      this.scheduledExecutorService = scheduledExecutorService;
-      this.initialDelay = initialDelay;
-      this.period = checkPeriod;
-      this.timeUnit = timeUnit;
-      this.onDemand = onDemand;
+      this(scheduledExecutorService, null, initialDelay, checkPeriod, timeUnit, onDemand);
    }
 
    /**
@@ -150,11 +146,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
 
    @Override
    public synchronized void start() {
-      if (future != null) {
+      if (isStarted) {
          // already started
          return;
       }
-
+      isStarted = true;
       if (scheduledExecutorService == null) {
          scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
          startedOwnScheduler = true;
@@ -165,10 +161,9 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
          return;
       }
 
-      this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
-
       if (period >= 0) {
-         future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
+         final AtomicBoolean booked = this.bookedForRunning;
+         future = scheduledExecutorService.scheduleWithFixedDelay(() -> runForScheduler(booked), initialDelay >= 0 ? initialDelay : period, period, timeUnit);
       } else {
          logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
       }
@@ -188,15 +183,26 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
 
    }
 
-   public void delay() {
-      int value = delayed.incrementAndGet();
-      if (value > 10) {
-         delayed.decrementAndGet();
-      } else {
-         // We only schedule up to 10 periods upfront.
-         // this is to avoid a window where a current one would be running and a next one is coming.
-         // in theory just 2 would be enough. I'm using 10 as a precaution here.
-         scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
+   /**
+    * A delay request can succeed only if:
+    * <ul>
+    *    <li>there is no other pending delay request
+    *    <li>there is no pending execution request
+    * </ul>
+    * <p>
+    * When a delay request succeed it schedule a new execution to happen in {@link #getPeriod()}.<br>
+    */
+   public boolean delay() {
+      final AtomicBoolean booked = this.bookedForRunning;
+      if (!booked.compareAndSet(false, true)) {
+         return false;
+      }
+      try {
+         scheduledExecutorService.schedule(() -> bookedRunForScheduler(booked), period, timeUnit);
+         return true;
+      } catch (RejectedExecutionException e) {
+         booked.set(false);
+         throw e;
       }
    }
 
@@ -261,7 +267,14 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
    }
 
    @Override
-   public void stop() {
+   public synchronized void stop() {
+      if (!isStarted) {
+         return;
+      }
+      isStarted = false;
+      // Replace the existing one: a new periodic task or any new delay after stop
+      // won't interact with the previously running ones
+      this.bookedForRunning = new AtomicBoolean(false);
       if (future != null) {
          future.cancel(false);
          future = null;
@@ -275,8 +288,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
    }
 
    @Override
-   public synchronized boolean isStarted() {
-      return future != null;
+   public boolean isStarted() {
+      return isStarted;
    }
 
    // this will restart the scheduled component upon changes
@@ -287,35 +300,43 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
       }
    }
 
-   final Runnable runForExecutor = new Runnable() {
-      @Override
-      public void run() {
-         if (onDemand && delayed.get() > 0) {
-            delayed.decrementAndGet();
-         }
+   private void runForExecutor(AtomicBoolean booked) {
+      // It unblocks:
+      // - a new delay request
+      // - next periodic run request (in case of executor != null)
+      // Although tempting, don't move this one after ActiveMQScheduledComponent.this.run():
+      // - it can cause "delay" to change semantic ie a racing delay while finished executing the task, won't succeed
+      // - it won't prevent "slow tasks" to accumulate, because slowness cannot be measured inside running method;
+      //   it just cause skipping runs for perfectly timed executions too
+      boolean alwaysTrue = booked.compareAndSet(true, false);
+      assert alwaysTrue;
+      ActiveMQScheduledComponent.this.run();
+   }
 
-         if (!onDemand && lastTime > 0) {
-            if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
-               logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
-               return;
+   private void bookedRunForScheduler(AtomicBoolean booked) {
+      assert booked.get();
+      if (executor != null) {
+         try {
+            executor.execute(() -> runForExecutor(booked));
+         } catch (RejectedExecutionException e) {
+            if (booked != null) {
+               booked.set(false);
             }
+            throw e;
          }
-
-         lastTime = System.currentTimeMillis();
-
-         ActiveMQScheduledComponent.this.run();
+      } else {
+         runForExecutor(booked);
       }
-   };
-
-   final Runnable runForScheduler = new Runnable() {
-      @Override
-      public void run() {
-         if (executor != null) {
-            executor.execute(runForExecutor);
-         } else {
-            runForExecutor.run();
-         }
+   }
+
+   private void runForScheduler(AtomicBoolean booked) {
+      if (!booked.compareAndSet(false, true)) {
+         // let's skip this execution because there is:
+         // - a previously submitted period task yet to start -> executor is probably overbooked!
+         // - a pending delay request
+         return;
       }
-   };
+      bookedRunForScheduler(booked);
+   }
 
 }
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
index aa67582..48e0711 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -79,6 +79,22 @@ public class ActiveMQScheduledComponentTest {
    }
 
    @Test
+   public void testSubMillisDelay() throws InterruptedException {
+      final CountDownLatch triggered = new CountDownLatch(2);
+      final long nsInterval = TimeUnit.MICROSECONDS.toNanos(900);
+      final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, nsInterval, TimeUnit.NANOSECONDS, false) {
+
+         @Override
+         public void run() {
+            triggered.countDown();
+         }
+      };
+      local.start();
+      Assert.assertTrue(triggered.await(10, TimeUnit.SECONDS));
+      local.stop();
+   }
+
+   @Test
    public void testVerifyInitialDelayChanged() {
       final long initialDelay = 10;
       final long period = 100;