You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/09 15:21:45 UTC

[camel] branch main updated: CAMEL-17121: converted the default producer cache to use repeatable tasks

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e653b81  CAMEL-17121: converted the default producer cache to use repeatable tasks
e653b81 is described below

commit e653b815839be4fb5b1146a9fd8312a15d997318
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Nov 9 11:51:26 2021 +0100

    CAMEL-17121: converted the default producer cache to use repeatable tasks
---
 .../camel/support/cache/DefaultProducerCache.java  | 40 +++++++++++++---------
 1 file changed, 24 insertions(+), 16 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 19ebfc5..a5b0509 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support.cache;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -40,6 +41,10 @@ import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,6 +125,23 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
         return source;
     }
 
+    private void waitForService(StatefulService service) {
+        BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
+                .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+                .withMaxDuration(Duration.ofMillis(ACQUIRE_WAIT_TIME))
+                .withInterval(Duration.ofMillis(5))
+                .build())
+                .build();
+
+        StopWatch watch = new StopWatch();
+        if (!task.run(service::isStarting)) {
+            LOG.warn("The producer did not finish starting");
+        }
+
+        LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", watch.taken(), service,
+                service.getStatus());
+    }
+
     @Override
     public AsyncProducer acquireProducer(Endpoint endpoint) {
         try {
@@ -127,27 +149,13 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
             if (statistics != null) {
                 statistics.onHit(endpoint.getEndpointUri());
             }
+
             // if producer is starting then wait for it to be ready
             if (producer instanceof StatefulService) {
                 StatefulService ss = (StatefulService) producer;
                 if (ss.isStarting()) {
                     LOG.trace("Waiting for producer to finish starting: {}", producer);
-                    StopWatch watch = new StopWatch();
-                    boolean done = false;
-                    while (!done) {
-                        done = !ss.isStarting() || watch.taken() > ACQUIRE_WAIT_TIME;
-                        if (!done) {
-                            Thread.sleep(5);
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Waiting {} ms for producer to finish starting: {} state: {}", watch.taken(),
-                                        producer, ss.getStatus());
-                            }
-                        }
-                    }
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", watch.taken(), producer,
-                                ss.getStatus());
-                    }
+                    waitForService(ss);
                 }
             }
             return producer;