You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/01 07:08:25 UTC

[camel] 03/03: Refactoring BackgroundTask - made Latch and completed to members

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f7cd5cab83228cd8e7bb77842976d1f9ed6ded43
Author: Michael Lück <gi...@lueckonline.net>
AuthorDate: Tue Nov 29 19:06:35 2022 +0100

    Refactoring BackgroundTask - made Latch and completed to members
    
    as the BackgroundTask is statefull we can just add the latch and the
    completed flag (introduced for CAMEL-18766) as members of the Background
    task (see discussion in github pull request 8785)
---
 .../apache/camel/support/task/BackgroundTask.java  | 30 ++++++++--------------
 .../apache/camel/support/task/BlockingTask.java    |  2 +-
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 035e795b422..6fed38feb10 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
 
 import org.apache.camel.support.task.budget.TimeBoundedBudget;
 import org.apache.camel.support.task.budget.TimeBudget;
@@ -77,7 +76,10 @@ public class BackgroundTask implements BlockingTask {
     private final TimeBudget budget;
     private final ScheduledExecutorService service;
     private final String name;
+    private final CountDownLatch latch = new CountDownLatch(1);
+
     private Duration elapsed = Duration.ZERO;
+    private boolean completed;
 
     BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) {
         this.budget = budget;
@@ -85,7 +87,7 @@ public class BackgroundTask implements BlockingTask {
         this.name = name;
     }
 
-    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) {
+    private void runTaskWrapper(BooleanSupplier supplier) {
         LOG.trace("Current latch value: {}", latch.getCount());
         if (latch.getCount() == 0) {
             return;
@@ -93,13 +95,13 @@ public class BackgroundTask implements BlockingTask {
 
         if (!budget.next()) {
             LOG.warn("The task {} does not have more budget to continue running", name);
-            result.completed = false;
+            completed = false;
             latch.countDown();
             return;
         }
 
         if (supplier.getAsBoolean()) {
-            result.completed = true;
+            completed = true;
             latch.countDown();
             LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
         }
@@ -107,21 +109,15 @@ public class BackgroundTask implements BlockingTask {
 
     @Override
     public boolean run(BooleanSupplier supplier) {
-        final CountDownLatch latch = new CountDownLatch(1);
-        // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which 
-        // is executed by the by ExecutorService
-        // the result will define whether the task executed successfully or has been ended because the budget has been consumed
-        // that will be used as the return value of this method (see description of super.run(...) methods
-        final ExecutionResult result = new ExecutionResult();
-
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(),
+
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        waitForTaskCompletion(latch, task);
-        return result.completed;
+        waitForTaskCompletion(task);
+        return completed;
     }
 
-    private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
+    private void waitForTaskCompletion(Future<?> task) {
         try {
             // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
             // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
@@ -148,8 +144,4 @@ public class BackgroundTask implements BlockingTask {
     public Duration elapsed() {
         return elapsed;
     }
-
-    private static class ExecutionResult {
-        private boolean completed;
-    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
index 757a7bcea20..ec17ea6f980 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
@@ -37,7 +37,7 @@ public interface BlockingTask extends Task {
      *                   task was interrupted.
      */
     default <T> boolean run(Predicate<T> predicate, T payload) {
-    	return this.run(() -> predicate.test(payload));
+        return this.run(() -> predicate.test(payload));
     }
 
     /**