You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/01 07:08:22 UTC

[camel] branch main updated (e487506ef99 -> f7cd5cab832)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from e487506ef99 Polished
     new bf6d3a0f75f CAMEL-18766: background tasks without maxDuration are reeschedulable
     new a5764537c61 default implementation for run(Predicate, payload) in BlockingTask
     new f7cd5cab832 Refactoring BackgroundTask - made Latch and completed to members

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/support/task/BackgroundTask.java  | 51 +++++-----------------
 .../apache/camel/support/task/BlockingTask.java    |  4 +-
 .../apache/camel/support/task/ForegroundTask.java  | 30 -------------
 .../task/BackgroundIterationTimeTaskTest.java      |  3 +-
 4 files changed, 15 insertions(+), 73 deletions(-)


[camel] 02/03: default implementation for run(Predicate, payload) in BlockingTask

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a5764537c61506759cf2a8d578714b0029e61f96
Author: Michael Lück <gi...@lueckonline.net>
AuthorDate: Mon Nov 28 20:00:29 2022 +0100

    default implementation for run(Predicate, payload) in BlockingTask
---
 .../apache/camel/support/task/BackgroundTask.java  |  5 ----
 .../apache/camel/support/task/BlockingTask.java    |  4 ++-
 .../apache/camel/support/task/ForegroundTask.java  | 30 ----------------------
 3 files changed, 3 insertions(+), 36 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index c7e63fb50e9..035e795b422 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -105,11 +105,6 @@ public class BackgroundTask implements BlockingTask {
         }
     }
 
-    @Override
-    public <T> boolean run(Predicate<T> predicate, T payload) {
-        return this.run(() -> predicate.test(payload));
-    }
-
     @Override
     public boolean run(BooleanSupplier supplier) {
         final CountDownLatch latch = new CountDownLatch(1);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
index a7e93e1d572..757a7bcea20 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
@@ -36,7 +36,9 @@ public interface BlockingTask extends Task {
      * @return           true if the task has completed successfully or false if: 1) the budget is exhausted or 2) the
      *                   task was interrupted.
      */
-    <T> boolean run(Predicate<T> predicate, T payload);
+    default <T> boolean run(Predicate<T> predicate, T payload) {
+    	return this.run(() -> predicate.test(payload));
+    }
 
     /**
      * Run the task
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
index deefe7e8d78..e5e32df2242 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
@@ -80,36 +80,6 @@ public class ForegroundTask implements BlockingTask {
         this.name = name;
     }
 
-    @Override
-    public <T> boolean run(Predicate<T> predicate, T payload) {
-        boolean completed = false;
-        try {
-            if (budget.initialDelay() > 0) {
-                Thread.sleep(budget.initialDelay());
-            }
-
-            while (budget.next()) {
-                if (predicate.test(payload)) {
-                    LOG.debug("Task {} is complete after {} iterations and it is ready to continue",
-                            name, budget.iteration());
-                    completed = true;
-                    break;
-                }
-
-                if (budget.canContinue()) {
-                    Thread.sleep(budget.interval());
-                }
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted {} while waiting for the repeatable task to finish", name);
-            Thread.currentThread().interrupt();
-        } finally {
-            elapsed = budget.elapsed();
-        }
-
-        return completed;
-    }
-
     @Override
     public boolean run(BooleanSupplier supplier) {
         boolean completed = false;


[camel] 01/03: CAMEL-18766: background tasks without maxDuration are reeschedulable

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bf6d3a0f75f18564988f785f7cebacf0663103e9
Author: Michael Lück <mi...@lueckonline.net>
AuthorDate: Sat Nov 26 11:37:03 2022 +0100

    CAMEL-18766: background tasks without maxDuration are reeschedulable
    
    Make sure BackgroundTask Thread is ended after maxIterations even if
    unlimitedMaxDurations is used but maintain possibility to retry forever
    of maxIterations has not been set.
    
    Also some code cleanup: encapsulate predicate execution into
    Supplier<Boolean>. This allows that we only need one runTaskWrapper and
    reduces code duplication. For example we can fix the maxIteration bug in
    a single place
    
    Also: minor change in comment, Co-authored-by: Nicolas Filotto <es...@users.noreply.github.com>
---
 .../apache/camel/support/task/BackgroundTask.java  | 56 ++++++++--------------
 .../task/BackgroundIterationTimeTaskTest.java      |  3 +-
 2 files changed, 21 insertions(+), 38 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 4909107e496..c7e63fb50e9 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -85,38 +85,21 @@ public class BackgroundTask implements BlockingTask {
         this.name = name;
     }
 
-    private <T> void runTaskWrapper(CountDownLatch latch, Predicate<T> predicate, T payload) {
+    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) {
         LOG.trace("Current latch value: {}", latch.getCount());
-
         if (latch.getCount() == 0) {
             return;
         }
 
         if (!budget.next()) {
             LOG.warn("The task {} does not have more budget to continue running", name);
-
-            return;
-        }
-
-        if (predicate.test(payload)) {
+            result.completed = false;
             latch.countDown();
-            LOG.trace("Task {} has succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
-        }
-    }
-
-    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier) {
-        LOG.trace("Current latch value: {}", latch.getCount());
-        if (latch.getCount() == 0) {
-            return;
-        }
-
-        if (!budget.next()) {
-            LOG.warn("The task {} does not have more budget to continue running", name);
-
             return;
         }
 
         if (supplier.getAsBoolean()) {
+            result.completed = true;
             latch.countDown();
             LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
         }
@@ -124,39 +107,36 @@ public class BackgroundTask implements BlockingTask {
 
     @Override
     public <T> boolean run(Predicate<T> predicate, T payload) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
-                budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS);
-
-        return waitForTaskCompletion(latch, task);
+        return this.run(() -> predicate.test(payload));
     }
 
     @Override
     public boolean run(BooleanSupplier supplier) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
+        final CountDownLatch latch = new CountDownLatch(1);
+        // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which 
+        // is executed by the by ExecutorService
+        // the result will define whether the task executed successfully or has been ended because the budget has been consumed
+        // that will be used as the return value of this method (see description of super.run(...) methods
+        final ExecutionResult result = new ExecutionResult();
+
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, task);
+        waitForTaskCompletion(latch, task);
+        return result.completed;
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
-        boolean completed = false;
+    private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
         try {
             // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
             // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
-                completed = true;
             } else {
                 if (!latch.await(budget.maxDuration(), TimeUnit.MILLISECONDS)) {
                     LOG.debug("Timeout out waiting for the completion of the task");
                 } else {
                     LOG.debug("The task has finished the execution and it is ready to continue");
-
-                    completed = true;
                 }
             }
 
@@ -167,12 +147,14 @@ public class BackgroundTask implements BlockingTask {
         } finally {
             elapsed = budget.elapsed();
         }
-
-        return completed;
     }
 
     @Override
     public Duration elapsed() {
         return elapsed;
     }
+
+    private static class ExecutionResult {
+        private boolean completed;
+    }
 }
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index cd5f7fb83e5..b7f112e1a2c 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -42,7 +42,8 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport {
                         .withMaxIterations(3)
                         .withInterval(Duration.ofSeconds(1))
                         .withInitialDelay(Duration.ZERO)
-                        .withMaxDuration(Duration.ofSeconds(5))
+                        // use unlimited duration so we're sure that the task is really canceled after maxIterations
+                        .withUnlimitedDuration()
                         .build())
                 .build();
 


[camel] 03/03: Refactoring BackgroundTask - made Latch and completed to members

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f7cd5cab83228cd8e7bb77842976d1f9ed6ded43
Author: Michael Lück <gi...@lueckonline.net>
AuthorDate: Tue Nov 29 19:06:35 2022 +0100

    Refactoring BackgroundTask - made Latch and completed to members
    
    as the BackgroundTask is statefull we can just add the latch and the
    completed flag (introduced for CAMEL-18766) as members of the Background
    task (see discussion in github pull request 8785)
---
 .../apache/camel/support/task/BackgroundTask.java  | 30 ++++++++--------------
 .../apache/camel/support/task/BlockingTask.java    |  2 +-
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 035e795b422..6fed38feb10 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
 
 import org.apache.camel.support.task.budget.TimeBoundedBudget;
 import org.apache.camel.support.task.budget.TimeBudget;
@@ -77,7 +76,10 @@ public class BackgroundTask implements BlockingTask {
     private final TimeBudget budget;
     private final ScheduledExecutorService service;
     private final String name;
+    private final CountDownLatch latch = new CountDownLatch(1);
+
     private Duration elapsed = Duration.ZERO;
+    private boolean completed;
 
     BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) {
         this.budget = budget;
@@ -85,7 +87,7 @@ public class BackgroundTask implements BlockingTask {
         this.name = name;
     }
 
-    private void runTaskWrapper(CountDownLatch latch, BooleanSupplier supplier, ExecutionResult result) {
+    private void runTaskWrapper(BooleanSupplier supplier) {
         LOG.trace("Current latch value: {}", latch.getCount());
         if (latch.getCount() == 0) {
             return;
@@ -93,13 +95,13 @@ public class BackgroundTask implements BlockingTask {
 
         if (!budget.next()) {
             LOG.warn("The task {} does not have more budget to continue running", name);
-            result.completed = false;
+            completed = false;
             latch.countDown();
             return;
         }
 
         if (supplier.getAsBoolean()) {
-            result.completed = true;
+            completed = true;
             latch.countDown();
             LOG.trace("Task {} succeeded and the current task won't be schedulable anymore: {}", name, latch.getCount());
         }
@@ -107,21 +109,15 @@ public class BackgroundTask implements BlockingTask {
 
     @Override
     public boolean run(BooleanSupplier supplier) {
-        final CountDownLatch latch = new CountDownLatch(1);
-        // we need a wrapper for the actual result that will be defined in the runTaskWrapper method which 
-        // is executed by the by ExecutorService
-        // the result will define whether the task executed successfully or has been ended because the budget has been consumed
-        // that will be used as the return value of this method (see description of super.run(...) methods
-        final ExecutionResult result = new ExecutionResult();
-
-        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier, result), budget.initialDelay(),
+
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        waitForTaskCompletion(latch, task);
-        return result.completed;
+        waitForTaskCompletion(task);
+        return completed;
     }
 
-    private void waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
+    private void waitForTaskCompletion(Future<?> task) {
         try {
             // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
             // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
@@ -148,8 +144,4 @@ public class BackgroundTask implements BlockingTask {
     public Duration elapsed() {
         return elapsed;
     }
-
-    private static class ExecutionResult {
-        private boolean completed;
-    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
index 757a7bcea20..ec17ea6f980 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java
@@ -37,7 +37,7 @@ public interface BlockingTask extends Task {
      *                   task was interrupted.
      */
     default <T> boolean run(Predicate<T> predicate, T payload) {
-    	return this.run(() -> predicate.test(payload));
+        return this.run(() -> predicate.test(payload));
     }
 
     /**