You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/01 07:08:25 UTC
[camel] 03/03: Refactoring BackgroundTask - made Latch and completed to members
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f7cd5cab83228cd8e7bb77842976d1f9ed6ded43
Author: Michael Lück <gi...@lueckonline.net>
AuthorDate: Tue Nov 29 19:06:35 2022 +0100
Refactoring BackgroundTask - made Latch and completed to members
as the BackgroundTask is statefull we can just add the latch and the
completed flag (introduced for CAMEL-18766) as members of the Background
task (see discussion in github pull request 8785)
---
.../apache/camel/support/task/BackgroundTask.java | 30 ++++++++--------------
.../apache/camel/support/task/BlockingTask.java | 2 +-
2 files changed, 12 insertions(+), 20 deletions(-)
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 035e795b422..6fed38feb10 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
import org.apache.camel.support.task.budget.TimeBoundedBudget;
import org.apache.camel.support.task.budget.TimeBudget;
@@ -77,7 +76,10 @@ public class BackgroundTask implements BlockingTask {
private final TimeBudget budget;
private final ScheduledExecutorService service;
private final String name;
+ private final CountDownLatch latch = new CountDownLatch(1);
+
private Duration elapsed = Duration.ZERO;
+ private boolean completed;
BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) {
this.budget = budget;
@@ -85,7 +87,7 @@ public class BackgroundTask implements BlockingTask {
this.name = name;
}
- private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) {
+ private void runTaskWrapper(BooleanSupplier supplier) {
LOG.trace("Current latch value: {}", latch.getCount());
if (latch.getCount() == 0) {
return;
@@ -93,13 +95,13 @@ public class BackgroundTask implements BlockingTask {
if (!budget.next()) {
LOG.warn("The task {} does not have more budget to continue running", name);
- result.completed = false;
+ completed = false;
latch.countDown();
return;
}
if (supplier.getAsBoolean()) {
- result.completed = true;
+ completed = true;
latch.countDown();
LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
}
@@ -107,21 +109,15 @@ public class BackgroundTask implements BlockingTask {
@Override
public boolean run(BooleanSupplier supplier) {
- final CountDownLatch latch = new CountDownLatch(1);
- // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which
- // is executed by the by ExecutorService
- // the result will define whether the task executed successfully or has been ended because the budget has been consumed
- // that will be used as the return value of this method (see description of super.run(...) methods
- final ExecutionResult result = new ExecutionResult();
-
- Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(),
+
+ Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(),
budget.interval(), TimeUnit.MILLISECONDS);
- waitForTaskCompletion(latch, task);
- return result.completed;
+ waitForTaskCompletion(task);
+ return completed;
}
- private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
+ private void waitForTaskCompletion(Future<?> task) {
try {
// We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
// This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
@@ -148,8 +144,4 @@ public class BackgroundTask implements BlockingTask {
public Duration elapsed() {
return elapsed;
}
-
- private static class ExecutionResult {
- private boolean completed;
- }
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
index 757a7bcea20..ec17ea6f980 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
@@ -37,7 +37,7 @@ public interface BlockingTask extends Task {
* task was interrupted.
*/
default <T> boolean run(Predicate<T> predicate, T payload) {
- return this.run(() -> predicate.test(payload));
+ return this.run(() -> predicate.test(payload));
}
/**