You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/28 17:09:05 UTC
[camel] 01/02: CAMEL-19058: check StatefulServiceInitialization just once
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 07b9c68ea2a9a70cf3a3f42c1045fb9fd0acbe6a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Feb 11 22:25:01 2023 +0100
CAMEL-19058: check StatefulServiceInitialization just once
This avoids both the type checking and the initialization status on the hot path
---
.../camel/support/cache/DefaultProducerCache.java | 35 --------------------
.../apache/camel/support/cache/ServicePool.java | 38 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 35 deletions(-)
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index d85fe95105c..17612316e62 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.support.cache;
-import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
@@ -31,7 +30,6 @@ import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.StatefulService;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.SharedInternalProcessor;
@@ -40,10 +38,6 @@ import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.support.task.BlockingTask;
-import org.apache.camel.support.task.Tasks;
-import org.apache.camel.support.task.budget.Budgets;
-import org.apache.camel.support.task.budget.IterationBoundedBudget;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +46,7 @@ import org.slf4j.LoggerFactory;
* Default implementation of {@link ProducerCache}.
*/
public class DefaultProducerCache extends ServiceSupport implements ProducerCache {
-
private static final Logger LOG = LoggerFactory.getLogger(DefaultProducerCache.class);
- private static final long ACQUIRE_WAIT_TIME = 30000;
private final CamelContext camelContext;
private final ProducerServicePool producers;
@@ -125,24 +117,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
return source;
}
- private void waitForService(StatefulService service) {
- BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
- .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
- .withMaxDuration(Duration.ofMillis(ACQUIRE_WAIT_TIME))
- .withInterval(Duration.ofMillis(5))
- .build())
- .build();
-
- if (!task.run(service::isStarting)) {
- LOG.warn("The producer: {} did not finish starting in {} ms", service, ACQUIRE_WAIT_TIME);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service,
- service.getStatus());
- }
- }
-
@Override
public AsyncProducer acquireProducer(Endpoint endpoint) {
try {
@@ -150,15 +124,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
if (statistics != null) {
statistics.onHit(endpoint.getEndpointUri());
}
-
- // if producer is starting then wait for it to be ready
- if (producer instanceof StatefulService) {
- StatefulService ss = (StatefulService) producer;
- if (ss.isStarting()) {
- LOG.trace("Waiting for producer to finish starting: {}", producer);
- waitForService(ss);
- }
- }
return producer;
} catch (Throwable e) {
throw new FailedToCreateProducerException(endpoint, e);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index fe0b6fe6cf9..6017d1cbb97 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.support.cache;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -28,9 +29,14 @@ import java.util.function.Function;
import org.apache.camel.Endpoint;
import org.apache.camel.NonManagedService;
import org.apache.camel.Service;
+import org.apache.camel.StatefulService;
import org.apache.camel.support.LRUCache;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
import org.apache.camel.util.function.ThrowingFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,6 +130,24 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
return s;
}
+ private void waitForService(StatefulService service) {
+ BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
+ .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+ .withMaxDuration(Duration.ofMillis(30000))
+ .withInterval(Duration.ofMillis(5))
+ .build())
+ .build();
+
+ if (!task.run(service::isStarting)) {
+ LOG.warn("The producer: {} did not finish starting in {} ms", service, 30000);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service,
+ service.getStatus());
+ }
+ }
+
/**
* Releases the producer/consumer back to the pool
*
@@ -237,6 +261,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
S tempS = creator.apply(endpoint);
endpoint.getCamelContext().addService(tempS, true, true);
s = tempS;
+
+ if (s instanceof StatefulService ss) {
+ if (ss.isStarting()) {
+ LOG.trace("Waiting for producer to finish starting: {}", s);
+ waitForService(ss);
+ }
+ }
}
}
}
@@ -349,6 +380,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
if (s == null) {
s = creator.apply(endpoint);
s.start();
+
+ if (s instanceof StatefulService ss) {
+ if (ss.isStarting()) {
+ LOG.trace("Waiting for producer to finish starting: {}", s);
+ waitForService(ss);
+ }
+ }
}
}
return s;