You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/09 10:24:12 UTC

[camel] branch main updated (868cf90 -> b73b55a)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 868cf90  CAMEL-17121: converted camel-smpp to the repeatable tasks
     new 464acde  CAMEL-17121: added support for configuring the back-off strategy
     new b73b55a  CAMEL-17121: converted camel-soroush to use the repeatable tasks

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../component/SoroushBotAbstractConsumer.java      |  19 +---
 .../soroushbot/component/SoroushBotEndpoint.java   | 103 ++++++++++++---------
 .../component/SoroushBotSendMessageProducer.java   |  70 +++++++++-----
 .../utils/ExponentialBackOffStrategy.java          |  21 +++--
 .../soroushbot/utils/FixedBackOffStrategy.java     |  13 ++-
 .../soroushbot/utils/LinearBackOffStrategy.java    |  19 ++--
 .../apache/camel/support/task/ForegroundTask.java  |   4 +-
 .../task/budget/IterationBoundedBudget.java        |  17 +++-
 .../task/budget/IterationBoundedBudgetBuilder.java |  15 ++-
 .../camel/support/task/budget/IterationBudget.java |   2 +-
 .../task/budget/IterationTimeBoundedBudget.java    |   2 +-
 .../task/budget/backoff}/BackOffStrategy.java      |  15 ++-
 .../task/budget/backoff/FixedBackOffStrategy.java  |  23 ++---
 .../apache/camel/support/task/TaskTestSupport.java |   1 -
 14 files changed, 191 insertions(+), 133 deletions(-)
 rename {components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils => core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff}/BackOffStrategy.java (68%)
 copy components/camel-coap/src/test/java/org/apache/camel/coap/CoAPComponentTLSTest.java => core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/FixedBackOffStrategy.java (70%)

[camel] 01/02: CAMEL-17121: added support for configuring the back-off strategy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 464acde20df180c8bbd8b536afb90c9cfdc4dfcd
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 8 15:42:42 2021 +0100

    CAMEL-17121: added support for configuring the back-off strategy
---
 .../org/apache/camel/support/task/ForegroundTask.java |  4 ++--
 .../support/task/budget/IterationBoundedBudget.java   | 17 +++++++++++++----
 .../task/budget/IterationBoundedBudgetBuilder.java    | 15 ++++++++++++++-
 .../camel/support/task/budget/IterationBudget.java    |  2 +-
 .../task/budget/IterationTimeBoundedBudget.java       |  2 +-
 .../support/task/budget/backoff}/BackOffStrategy.java | 15 +++++++++++++--
 .../task/budget/backoff/FixedBackOffStrategy.java     | 19 ++++++++++++++++---
 .../apache/camel/support/task/TaskTestSupport.java    |  1 -
 8 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
index f7edb36..5b1935b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java
@@ -74,7 +74,7 @@ public class ForegroundTask implements BlockingTask {
             while (budget.next()) {
                 if (predicate.test(payload)) {
                     LOG.info("Task {} is complete after {} iterations and it is ready to continue",
-                            name, budget.iterations());
+                            name, budget.iteration());
                     completed = true;
                     break;
                 }
@@ -103,7 +103,7 @@ public class ForegroundTask implements BlockingTask {
             while (budget.next()) {
                 if (supplier.getAsBoolean()) {
                     LOG.info("Task {} is complete after {} iterations and it is ready to continue",
-                            name, budget.iterations());
+                            name, budget.iteration());
                     completed = true;
 
                     break;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
index 1225846..6cf3574 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
@@ -17,18 +17,27 @@
 
 package org.apache.camel.support.task.budget;
 
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
+import org.apache.camel.support.task.budget.backoff.FixedBackOffStrategy;
+
 public class IterationBoundedBudget implements IterationBudget {
     public static final int UNLIMITED_ITERATIONS = -1;
 
     private final long initialDelay;
-    private final long interval;
     private final int maxIterations;
+    private final BackOffStrategy backOffStrategy;
     private int iterations;
 
     IterationBoundedBudget(long initialDelay, long interval, int maxIterations) {
         this.initialDelay = initialDelay;
-        this.interval = interval;
         this.maxIterations = maxIterations;
+        this.backOffStrategy = new FixedBackOffStrategy(interval);
+    }
+
+    IterationBoundedBudget(long initialDelay, int maxIterations, BackOffStrategy backOffStrategy) {
+        this.initialDelay = initialDelay;
+        this.maxIterations = maxIterations;
+        this.backOffStrategy = backOffStrategy;
     }
 
     @Override
@@ -38,7 +47,7 @@ public class IterationBoundedBudget implements IterationBudget {
 
     @Override
     public long interval() {
-        return interval;
+        return backOffStrategy.calculateInterval(iterations);
     }
 
     @Override
@@ -46,7 +55,7 @@ public class IterationBoundedBudget implements IterationBudget {
         return maxIterations;
     }
 
-    public int iterations() {
+    public int iteration() {
         return iterations;
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudgetBuilder.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudgetBuilder.java
index 4770b29..e410bbe 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudgetBuilder.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudgetBuilder.java
@@ -19,6 +19,8 @@ package org.apache.camel.support.task.budget;
 
 import java.time.Duration;
 
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
+
 /**
  * A helper builder of iteration bounded builders. Provide generic/safe default values, but should be adjusted on a
  * per-case basis. By default, execute the iterations for up to Integer.MAX_VALUE.
@@ -31,6 +33,7 @@ public class IterationBoundedBudgetBuilder implements BudgetBuilder<IterationBud
     protected long initialDelay = DEFAULT_INITIAL_DELAY;
     protected long interval = DEFAULT_INTERVAL;
     protected int maxIterations = DEFAULT_MAX_ITERATIONS;
+    protected BackOffStrategy backOffStrategy;
 
     public IterationBoundedBudgetBuilder withInitialDelay(Duration duration) {
         this.initialDelay = duration.toMillis();
@@ -50,8 +53,18 @@ public class IterationBoundedBudgetBuilder implements BudgetBuilder<IterationBud
         return this;
     }
 
+    public IterationBoundedBudgetBuilder withBackOffStrategy(BackOffStrategy backOffStrategy) {
+        this.backOffStrategy = backOffStrategy;
+
+        return this;
+    }
+
     @Override
     public IterationBoundedBudget build() {
-        return new IterationBoundedBudget(initialDelay, interval, maxIterations);
+        if (backOffStrategy == null) {
+            return new IterationBoundedBudget(initialDelay, interval, maxIterations);
+        }
+
+        return new IterationBoundedBudget(initialDelay, maxIterations, backOffStrategy);
     }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBudget.java
index bab5111..497e8af 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBudget.java
@@ -34,5 +34,5 @@ public interface IterationBudget extends Budget {
      * 
      * @return the current number of iterations
      */
-    int iterations();
+    int iteration();
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationTimeBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationTimeBoundedBudget.java
index 5c95986..702bc28 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationTimeBoundedBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationTimeBoundedBudget.java
@@ -42,7 +42,7 @@ public class IterationTimeBoundedBudget implements IterationBudget, TimeBudget {
     }
 
     @Override
-    public int iterations() {
+    public int iteration() {
         return iterationBudget.maxIterations();
     }
 
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/BackOffStrategy.java
similarity index 68%
copy from components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java
copy to core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/BackOffStrategy.java
index f531719..e8df94c 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/BackOffStrategy.java
@@ -14,8 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.soroushbot.utils;
 
+package org.apache.camel.support.task.budget.backoff;
+
+/**
+ * A back-off strategy is used to configure different strategies for calculating the interval time between iterations
+ */
 public interface BackOffStrategy {
-    void waitBeforeRetry(int retryCount) throws InterruptedException;
+
+    /**
+     * Calculates the back-off interval
+     *
+     * @param  iteration the current iteration count
+     * @return           the interval in milliseconds
+     */
+    long calculateInterval(int iteration);
 }
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/FixedBackOffStrategy.java
similarity index 67%
rename from components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java
rename to core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/FixedBackOffStrategy.java
index f531719..a059277 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/BackOffStrategy.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/backoff/FixedBackOffStrategy.java
@@ -14,8 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.soroushbot.utils;
 
-public interface BackOffStrategy {
-    void waitBeforeRetry(int retryCount) throws InterruptedException;
+package org.apache.camel.support.task.budget.backoff;
+
+/**
+ * A back-off strategy with constant rate
+ */
+public class FixedBackOffStrategy implements BackOffStrategy {
+    private final long interval;
+
+    public FixedBackOffStrategy(long interval) {
+        this.interval = interval;
+    }
+
+    @Override
+    public long calculateInterval(int iteration) {
+        return interval;
+    }
 }
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java b/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java
index 7396a95..b51e98a 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java
@@ -55,7 +55,6 @@ public class TaskTestSupport {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
-            System.out.println("Interrupted");
             return false;
         }
 

[camel] 02/02: CAMEL-17121: converted camel-soroush to use the repeatable tasks

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b73b55a039393621fb5dfa27ffbfcd011060d07c
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Nov 9 08:18:30 2021 +0100

    CAMEL-17121: converted camel-soroush to use the repeatable tasks
---
 .../component/SoroushBotAbstractConsumer.java      |  19 +---
 .../soroushbot/component/SoroushBotEndpoint.java   | 103 ++++++++++++---------
 .../component/SoroushBotSendMessageProducer.java   |  70 +++++++++-----
 .../utils/ExponentialBackOffStrategy.java          |  21 +++--
 .../soroushbot/utils/FixedBackOffStrategy.java     |  13 ++-
 .../soroushbot/utils/LinearBackOffStrategy.java    |  19 ++--
 6 files changed, 138 insertions(+), 107 deletions(-)

diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
index 4bfcc8e..858ee66 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotAbstractConsumer.java
@@ -34,8 +34,6 @@ import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.soroushbot.utils.StringUtils.ordinal;
-
 /**
  * this component handle logic for getting message from Soroush server and for each message it calls abstract function
  * {@link SoroushBotAbstractConsumer#sendExchange(Exchange)} each subclass should handle how it will start the
@@ -84,21 +82,14 @@ public abstract class SoroushBotAbstractConsumer extends DefaultConsumer impleme
         connection = new ReconnectableEventSourceListener(client, request, endpoint.getMaxConnectionRetry()) {
             @Override
             protected boolean onBeforeConnect() {
-                int connectionRetry = getConnectionRetry();
+                long interval = endpoint.getBackOffStrategyHelper().calculateInterval(getConnectionRetry());
+
                 try {
-                    endpoint.waitBeforeRetry(connectionRetry);
+                    Thread.sleep(interval);
                 } catch (InterruptedException e) {
-                    return false;
-                }
-                if (!shutdown) {
-                    if (connectionRetry == 0) {
-                        LOG.info("connecting to getMessage from soroush");
-                    } else {
-                        if (LOG.isInfoEnabled()) {
-                            LOG.info("connection is closed. retrying for the {} time(s)... ", ordinal(connectionRetry));
-                        }
-                    }
+                    Thread.currentThread().interrupt();
                 }
+
                 return !shutdown;
             }
 
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotEndpoint.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotEndpoint.java
index 044cde0..68e4dd2 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotEndpoint.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotEndpoint.java
@@ -36,7 +36,6 @@ import org.apache.camel.component.soroushbot.models.SoroushAction;
 import org.apache.camel.component.soroushbot.models.SoroushMessage;
 import org.apache.camel.component.soroushbot.models.response.UploadFileResponse;
 import org.apache.camel.component.soroushbot.service.SoroushService;
-import org.apache.camel.component.soroushbot.utils.BackOffStrategy;
 import org.apache.camel.component.soroushbot.utils.ExponentialBackOffStrategy;
 import org.apache.camel.component.soroushbot.utils.FixedBackOffStrategy;
 import org.apache.camel.component.soroushbot.utils.LinearBackOffStrategy;
@@ -48,6 +47,10 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
 import org.glassfish.jersey.media.multipart.MultiPart;
 import org.glassfish.jersey.media.multipart.file.StreamDataBodyPart;
 import org.slf4j.Logger;
@@ -153,6 +156,20 @@ public class SoroushBotEndpoint extends DefaultEndpoint {
 
     private BackOffStrategy backOffStrategyHelper;
 
+    class Payload {
+        private final InputStream inputStream;
+        private final SoroushMessage message;
+        private final String fileType;
+        private UploadFileResponse response;
+        private Exception exception;
+
+        public Payload(InputStream inputStream, SoroushMessage message, String fileType) {
+            this.inputStream = inputStream;
+            this.message = message;
+            this.fileType = fileType;
+        }
+    }
+
     public SoroushBotEndpoint(String endpointUri, SoroushBotComponent component) {
         super(endpointUri, component);
     }
@@ -235,10 +252,10 @@ public class SoroushBotEndpoint extends DefaultEndpoint {
         if (backOffStrategy.equalsIgnoreCase("fixed")) {
             backOffStrategyHelper = new FixedBackOffStrategy(retryWaitingTime, maxRetryWaitingTime);
         } else if (backOffStrategy.equalsIgnoreCase("linear")) {
-            backOffStrategyHelper = new LinearBackOffStrategy(retryWaitingTime, retryLinearIncrement, maxRetryWaitingTime);
+            backOffStrategyHelper = new LinearBackOffStrategy(retryWaitingTime, retryLinearIncrement);
         } else {
             backOffStrategyHelper
-                    = new ExponentialBackOffStrategy(retryWaitingTime, retryExponentialCoefficient, maxRetryWaitingTime);
+                    = new ExponentialBackOffStrategy(retryWaitingTime, retryExponentialCoefficient);
         }
     }
 
@@ -496,44 +513,48 @@ public class SoroushBotEndpoint extends DefaultEndpoint {
     /**
      * try to upload an inputStream to server
      */
-    private UploadFileResponse uploadToServer(InputStream inputStream, SoroushMessage message, String fileType)
-            throws SoroushException, InterruptedException {
-        javax.ws.rs.core.Response response;
-        //this for handle connection retry if sending request failed.
-        for (int count = 0; count <= maxConnectionRetry; count++) {
-            waitBeforeRetry(count);
-
-            try (MultiPart multipart = new MultiPart()) {
-                multipart.setMediaType(MediaType.MULTIPART_FORM_DATA_TYPE);
-                multipart.bodyPart(new StreamDataBodyPart("file", inputStream, null, MediaType.APPLICATION_OCTET_STREAM_TYPE));
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("try to upload {} for the {} time for message: {}", fileType,
-                            StringUtils.ordinal(count + 1), message);
-                }
-                response = getUploadFileTarget().request(MediaType.APPLICATION_JSON_TYPE)
-                        .post(Entity.entity(multipart, multipart.getMediaType()));
-                return SoroushService.get().assertSuccessful(response, UploadFileResponse.class, message);
-            } catch (IOException | ProcessingException ex) {
-                //if maximum connection retry reached, abort sending the request.
-                if (count == maxConnectionRetry) {
-                    throw new MaximumConnectionRetryReachedException(
-                            "uploading " + fileType + " for message " + message + " failed. Maximum retry limit reached!"
-                                                                     + " aborting upload file and send message",
-                            ex, message);
-                }
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn("uploading " + fileType + " for message " + message + " failed", ex);
-                }
+    private UploadFileResponse uploadToServer(InputStream inputStream, SoroushMessage message, String fileType) {
+        Payload payload = new Payload(inputStream, message, fileType);
+
+        BlockingTask task = Tasks.foregroundTask().withBudget(
+                Budgets.iterationBudget()
+                        .withMaxIterations(maxConnectionRetry)
+                        .withBackOffStrategy(backOffStrategyHelper)
+                        .build())
+                .withName("upload-to-server")
+                .build();
+
+        if (!task.run(this::doUploadToServer, payload)) {
+            LOG.error("Exhausted all retries trying to upload data");
+            throw new MaximumConnectionRetryReachedException(
+                    "Uploading " + fileType + " for message " + message + " failed. Maximum retry limit reached! aborting "
+                                                             + "upload file and send message",
+                    payload.exception, message);
+        }
+
+        return payload.response;
+
+    }
+
+    private boolean doUploadToServer(Payload payload) {
+        try (MultiPart multipart = new MultiPart()) {
+            multipart.setMediaType(MediaType.MULTIPART_FORM_DATA_TYPE);
+            multipart.bodyPart(new StreamDataBodyPart(
+                    "file", payload.inputStream, null,
+                    MediaType.APPLICATION_OCTET_STREAM_TYPE));
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Trying to upload {} for message: {}", payload.fileType, payload.message);
             }
+
+            javax.ws.rs.core.Response response = getUploadFileTarget().request(MediaType.APPLICATION_JSON_TYPE)
+                    .post(Entity.entity(multipart, multipart.getMediaType()));
+            payload.response = SoroushService.get().assertSuccessful(response, UploadFileResponse.class, payload.message);
+            return true;
+        } catch (IOException | ProcessingException | SoroushException ex) {
+            payload.exception = ex;
         }
-        LOG.error(
-                "should never reach this line of code because maxConnectionRetry is greater than -1 and at least the above for must execute single time and");
-        //for backup
-        throw new MaximumConnectionRetryReachedException(
-                "uploading " + fileType + " for message " + message + " failed. Maximum retry limit reached! aborting "
-                                                         + "upload file and send message",
-                message);
+        return false;
     }
 
     /**
@@ -627,8 +648,4 @@ public class SoroushBotEndpoint extends DefaultEndpoint {
                 "can not upload " + type + ": " + fileUrl + " response:" + ((response == null) ? null : response.getStatus()),
                 message);
     }
-
-    public void waitBeforeRetry(int retryCount) throws InterruptedException {
-        backOffStrategyHelper.waitBeforeRetry(retryCount);
-    }
 }
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSendMessageProducer.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSendMessageProducer.java
index 975f238..05b1c23 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSendMessageProducer.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/component/SoroushBotSendMessageProducer.java
@@ -31,11 +31,12 @@ import org.apache.camel.component.soroushbot.service.SoroushService;
 import org.apache.camel.component.soroushbot.utils.MaximumConnectionRetryReachedException;
 import org.apache.camel.component.soroushbot.utils.SoroushException;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.soroushbot.utils.StringUtils.ordinal;
-
 /**
  * this Producer is responsible for URIs of type {@link SoroushAction#sendMessage} to send message to SoroushAPI. it
  * will be instantiated for URIs like "soroush:sendMessage/[token]
@@ -46,6 +47,15 @@ public class SoroushBotSendMessageProducer extends DefaultProducer {
     SoroushBotEndpoint endpoint;
     ObjectMapper objectMapper = new ObjectMapper();
 
+    private static class SoroushMessagePayload {
+        final SoroushMessage message;
+        Exception exception;
+
+        public SoroushMessagePayload(SoroushMessage message) {
+            this.message = message;
+        }
+    }
+
     public SoroushBotSendMessageProducer(SoroushBotEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
@@ -61,35 +71,45 @@ public class SoroushBotSendMessageProducer extends DefaultProducer {
         sendMessage(message);
     }
 
+    private boolean doSendMessage(SoroushMessagePayload payload) {
+        try {
+            Response response = endpoint.getSendMessageTarget().request(MediaType.APPLICATION_JSON_TYPE)
+                    .post(Entity.entity(objectMapper.writeValueAsString(payload.message), MediaType.APPLICATION_JSON_TYPE));
+            SoroushService.get().assertSuccessful(response, payload.message);
+
+            return true;
+        } catch (SoroushException | IOException | ProcessingException e) {
+            LOG.warn("failed to send message: {}", payload.message, e);
+
+            payload.exception = e;
+        }
+
+        return false;
+    }
+
     /**
      * @throws MaximumConnectionRetryReachedException if can not connect to soroush after retry
      *                                                {@link SoroushBotEndpoint#getMaxConnectionRetry()} times
      * @throws SoroushException                       if soroush response code wasn't 200
      */
-    private void sendMessage(SoroushMessage message)
-            throws SoroushException, MaximumConnectionRetryReachedException, InterruptedException {
-        Response response;
-        // this for is responsible to handle maximum connection retry.
-        for (int count = 0; count <= endpoint.getMaxConnectionRetry(); count++) {
-            endpoint.waitBeforeRetry(count);
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("sending message for {} time(s). message: {}", ordinal(count + 1), message);
-                }
-                response = endpoint.getSendMessageTarget().request(MediaType.APPLICATION_JSON_TYPE)
-                        .post(Entity.entity(objectMapper.writeValueAsString(message), MediaType.APPLICATION_JSON_TYPE));
-                SoroushService.get().assertSuccessful(response, message);
-                return;
-            } catch (IOException | ProcessingException ex) {
-                if (count == endpoint.getMaxConnectionRetry()) {
-                    throw new MaximumConnectionRetryReachedException(
-                            "failed to send message. maximum retry limit reached. aborting... message: " + message, ex,
-                            message);
-                }
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn("failed to send message: {}", message, ex);
-                }
+    private void sendMessage(SoroushMessage message) throws MaximumConnectionRetryReachedException {
+
+        BlockingTask task = Tasks.foregroundTask()
+                .withBudget(Budgets.iterationBudget()
+                        .withMaxIterations(endpoint.getMaxConnectionRetry() + 1)
+                        .withBackOffStrategy(endpoint.getBackOffStrategyHelper())
+                        .build())
+                .withName("send-message")
+                .build();
+
+        SoroushMessagePayload payload = new SoroushMessagePayload(message);
 
+        if (!task.run(this::doSendMessage, payload)) {
+            if (payload.exception != null) {
+                throw new MaximumConnectionRetryReachedException(
+                        "Failed to send message. maximum retry limit reached. aborting... message: "
+                                                                 + message,
+                        payload.exception, message);
             }
         }
     }
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/ExponentialBackOffStrategy.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/ExponentialBackOffStrategy.java
index 3f30ca6..648885a 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/ExponentialBackOffStrategy.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/ExponentialBackOffStrategy.java
@@ -16,23 +16,26 @@
  */
 package org.apache.camel.component.soroushbot.utils;
 
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
+
 public class ExponentialBackOffStrategy implements BackOffStrategy {
-    Long baseRetryTimeOut;
-    Long coefficient;
-    //Long maxRetryWaitingTime;
+    private final long baseRetryTimeOut;
+    private final long coefficient;
 
-    public ExponentialBackOffStrategy(Long baseRetryTimeOut, Long coefficient, Long maxRetryWaitingTime) {
+    public ExponentialBackOffStrategy(Long baseRetryTimeOut, Long coefficient) {
         this.baseRetryTimeOut = baseRetryTimeOut;
         this.coefficient = coefficient;
-        //this.maxRetryWaitingTime = maxRetryWaitingTime;
     }
 
     @Override
-    public void waitBeforeRetry(int retryCount) throws InterruptedException {
+    public long calculateInterval(int iteration) {
         //the first and second request do not need wait
-        retryCount -= 2;
-        if (retryCount >= 0) {
-            Thread.sleep((long) (baseRetryTimeOut * Math.pow(coefficient, retryCount)));
+        if (iteration > 2) {
+            long interval = (long) (baseRetryTimeOut * Math.pow(coefficient, iteration - 2));
+
+            return interval;
         }
+
+        return 0;
     }
 }
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/FixedBackOffStrategy.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/FixedBackOffStrategy.java
index fe37f45..9ef5c18 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/FixedBackOffStrategy.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/FixedBackOffStrategy.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.soroushbot.utils;
 
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
+
 public class FixedBackOffStrategy implements BackOffStrategy {
-    Long backOffTime;
-    Long maxRetryWaitingTime;
+    private final long backOffTime;
+    private final long maxRetryWaitingTime;
 
     public FixedBackOffStrategy(Long backOffTime, Long maxRetryWaitingTime) {
         this.backOffTime = backOffTime;
@@ -26,10 +28,7 @@ public class FixedBackOffStrategy implements BackOffStrategy {
     }
 
     @Override
-    public void waitBeforeRetry(int retryCount) throws InterruptedException {
-        //the first and second request do not need wait
-        if (retryCount >= 2L) {
-            Thread.sleep(Math.min(maxRetryWaitingTime, backOffTime));
-        }
+    public long calculateInterval(int iteration) {
+        return Math.min(maxRetryWaitingTime, backOffTime);
     }
 }
diff --git a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/LinearBackOffStrategy.java b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/LinearBackOffStrategy.java
index ee815ee..3bf2913 100644
--- a/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/LinearBackOffStrategy.java
+++ b/components/camel-soroush/src/main/java/org/apache/camel/component/soroushbot/utils/LinearBackOffStrategy.java
@@ -16,23 +16,24 @@
  */
 package org.apache.camel.component.soroushbot.utils;
 
+import org.apache.camel.support.task.budget.backoff.BackOffStrategy;
+
 public class LinearBackOffStrategy implements BackOffStrategy {
-    Long retryWaitingTime;
-    Long increment;
-    //Long maxRetryWaitingTime;
+    private final long retryWaitingTime;
+    private final long increment;
 
-    public LinearBackOffStrategy(Long retryWaitingTime, Long increment, Long maxRetryWaitingTime) {
+    public LinearBackOffStrategy(Long retryWaitingTime, Long increment) {
         this.retryWaitingTime = retryWaitingTime;
         this.increment = increment;
-        //this.maxRetryWaitingTime = maxRetryWaitingTime;
     }
 
     @Override
-    public void waitBeforeRetry(int retryCount) throws InterruptedException {
+    public long calculateInterval(int iteration) {
         //the first and second request do not need wait
-        retryCount -= 2;
-        if (retryCount >= 0) {
-            Thread.sleep(retryWaitingTime + increment * retryCount);
+        if (iteration > 2) {
+            return retryWaitingTime + increment * (iteration - 2);
         }
+
+        return 0;
     }
 }