You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/09 15:21:45 UTC
[camel] branch main updated: CAMEL-17121: converted the default
producer cache to use repeatable tasks
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e653b81 CAMEL-17121: converted the default producer cache to use repeatable tasks
e653b81 is described below
commit e653b815839be4fb5b1146a9fd8312a15d997318
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Nov 9 11:51:26 2021 +0100
CAMEL-17121: converted the default producer cache to use repeatable tasks
---
.../camel/support/cache/DefaultProducerCache.java | 40 +++++++++++++---------
1 file changed, 24 insertions(+), 16 deletions(-)
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index 19ebfc5..a5b0509 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.support.cache;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
@@ -40,6 +41,10 @@ import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +125,23 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
return source;
}
+ private void waitForService(StatefulService service) {
+ BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
+ .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+ .withMaxDuration(Duration.ofMillis(ACQUIRE_WAIT_TIME))
+ .withInterval(Duration.ofMillis(5))
+ .build())
+ .build();
+
+ StopWatch watch = new StopWatch();
+ if (!task.run(service::isStarting)) {
+ LOG.warn("The producer did not finish starting");
+ }
+
+ LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", watch.taken(), service,
+ service.getStatus());
+ }
+
@Override
public AsyncProducer acquireProducer(Endpoint endpoint) {
try {
@@ -127,27 +149,13 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
if (statistics != null) {
statistics.onHit(endpoint.getEndpointUri());
}
+
// if producer is starting then wait for it to be ready
if (producer instanceof StatefulService) {
StatefulService ss = (StatefulService) producer;
if (ss.isStarting()) {
LOG.trace("Waiting for producer to finish starting: {}", producer);
- StopWatch watch = new StopWatch();
- boolean done = false;
- while (!done) {
- done = !ss.isStarting() || watch.taken() > ACQUIRE_WAIT_TIME;
- if (!done) {
- Thread.sleep(5);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting {} ms for producer to finish starting: {} state: {}", watch.taken(),
- producer, ss.getStatus());
- }
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", watch.taken(), producer,
- ss.getStatus());
- }
+ waitForService(ss);
}
}
return producer;