You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/02 09:16:02 UTC

[camel] branch camel-3.18.x updated: CAMEL-18766: backport Background task fixes (#8814)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 56e9704564d CAMEL-18766: backport Background task fixes (#8814)
56e9704564d is described below

commit 56e9704564dae38a1289ab60f4c7fd1deba7a35a
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Fri Dec 2 10:15:55 2022 +0100

    CAMEL-18766: backport Background task fixes (#8814)
    
    * CAMEL-18766: background tasks without maxDuration are reeschedulable
    
    Make sure BackgroundTask Thread is ended after maxIterations even if
    unlimitedMaxDurations is used but maintain possibility to retry forever
    of maxIterations has not been set.
    
    Also some code cleanup: encapsulate predicate execution into
    Supplier<Boolean>. This allows that we only need one runTaskWrapper and
    reduces code duplication. For example we can fix the maxIteration bug in
    a single place
    
    Also: minor change in comment, Co-authored-by: Nicolas Filotto <es...@users.noreply.github.com>
    
    * default implementation for run(Predicate, payload) in BlockingTask
    
    * Refactoring BackgroundTask - made Latch and completed to members
    
    as the BackgroundTask is statefull we can just add the latch and the
    completed flag (introduced for CAMEL-18766) as members of the Background
    task (see discussion in github pull request 8785)
    
    Co-authored-by: Michael Lück <mi...@lueckonline.net>
    Co-authored-by: Michael Lück <gi...@lueckonline.net>
---
 .../apache/camel/support/task/BackgroundTask.java  | 51 +++++-----------------
 .../apache/camel/support/task/BlockingTask.java    |  4 +-
 .../apache/camel/support/task/ForegroundTask.java  | 30 -------------
 .../task/BackgroundIterationTimeTaskTest.java      |  3 +-
 4 files changed, 15 insertions(+), 73 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 4909107e496..6fed38feb10 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
 
 import org.apache.camel.support.task.budget.TimeBoundedBudget;
 import org.apache.camel.support.task.budget.TimeBudget;
@@ -77,7 +76,10 @@ public class BackgroundTask implements BlockingTask {
     private final TimeBudget budget;
     private final ScheduledExecutorService service;
     private final String name;
+    private final CountDownLatch latch = new CountDownLatch(1);
+
     private Duration elapsed = Duration.ZERO;
+    private boolean completed;
 
     BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) {
         this.budget = budget;
@@ -85,78 +87,47 @@ public class BackgroundTask implements BlockingTask {
         this.name = name;
     }
 
-    private <T> void runTaskWrapper(CountDownLatch latch, Predicate<T> predicate, T payload) {
+    private void runTaskWrapper(BooleanSupplier supplier) {
         LOG.trace("Current latch value: {}", latch.getCount());
-
         if (latch.getCount() == 0) {
             return;
         }
 
         if (!budget.next()) {
             LOG.warn("The task {} does not have more budget to continue running", name);
-
-            return;
-        }
-
-        if (predicate.test(payload)) {
+            completed = false;
             latch.countDown();
-            LOG.trace("Task {} has succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
-        }
-    }
-
-    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier) {
-        LOG.trace("Current latch value: {}", latch.getCount());
-        if (latch.getCount() == 0) {
-            return;
-        }
-
-        if (!budget.next()) {
-            LOG.warn("The task {} does not have more budget to continue running", name);
-
             return;
         }
 
         if (supplier.getAsBoolean()) {
+            completed = true;
             latch.countDown();
             LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
         }
     }
 
-    @Override
-    public <T> boolean run(Predicate<T> predicate, T payload) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
-                budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS);
-
-        return waitForTaskCompletion(latch, task);
-    }
-
     @Override
     public boolean run(BooleanSupplier supplier) {
-        CountDownLatch latch = new CountDownLatch(1);
 
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, task);
+        waitForTaskCompletion(task);
+        return completed;
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
-        boolean completed = false;
+    private void waitForTaskCompletion(Future<?> task) {
         try {
             // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
             // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
-                completed = true;
             } else {
                 if (!latch.await(budget.maxDuration(), TimeUnit.MILLISECONDS)) {
                     LOG.debug("Timeout out waiting for the completion of the task");
                 } else {
                     LOG.debug("The task has finished the execution and it is ready to continue");
-
-                    completed = true;
                 }
             }
 
@@ -167,8 +138,6 @@ public class BackgroundTask implements BlockingTask {
         } finally {
             elapsed = budget.elapsed();
         }
-
-        return completed;
     }
 
     @Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
index a7e93e1d572..ec17ea6f980 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
@@ -36,7 +36,9 @@ public interface BlockingTask extends Task {
      * @return           true if the task has completed successfully or false if: 1) the budget is exhausted or 2) the
      *                   task was interrupted.
      */
-    <T> boolean run(Predicate<T> predicate, T payload);
+    default <T> boolean run(Predicate<T> predicate, T payload) {
+        return this.run(() -> predicate.test(payload));
+    }
 
     /**
      * Run the task
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
index deefe7e8d78..e5e32df2242 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
@@ -80,36 +80,6 @@ public class ForegroundTask implements BlockingTask {
         this.name = name;
     }
 
-    @Override
-    public <T> boolean run(Predicate<T> predicate, T payload) {
-        boolean completed = false;
-        try {
-            if (budget.initialDelay() > 0) {
-                Thread.sleep(budget.initialDelay());
-            }
-
-            while (budget.next()) {
-                if (predicate.test(payload)) {
-                    LOG.debug("Task {} is complete after {} iterations and it is ready to continue",
-                            name, budget.iteration());
-                    completed = true;
-                    break;
-                }
-
-                if (budget.canContinue()) {
-                    Thread.sleep(budget.interval());
-                }
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted {} while waiting for the repeatable task to finish", name);
-            Thread.currentThread().interrupt();
-        } finally {
-            elapsed = budget.elapsed();
-        }
-
-        return completed;
-    }
-
     @Override
     public boolean run(BooleanSupplier supplier) {
         boolean completed = false;
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index cd5f7fb83e5..b7f112e1a2c 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -42,7 +42,8 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport {
                         .withMaxIterations(3)
                         .withInterval(Duration.ofSeconds(1))
                         .withInitialDelay(Duration.ZERO)
-                        .withMaxDuration(Duration.ofSeconds(5))
+                        // use unlimited duration so we're sure that the task is really canceled after maxIterations
+                        .withUnlimitedDuration()
                         .build())
                 .build();