You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/02/28 17:09:05 UTC

[camel] 01/02: CAMEL-19058: check StatefulServiceInitialization just once

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 07b9c68ea2a9a70cf3a3f42c1045fb9fd0acbe6a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Feb 11 22:25:01 2023 +0100

    CAMEL-19058: check StatefulServiceInitialization just once
    
    This avoids both the type checking and the initialization status on the hot path
---
 .../camel/support/cache/DefaultProducerCache.java  | 35 --------------------
 .../apache/camel/support/cache/ServicePool.java    | 38 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 35 deletions(-)

diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
index d85fe95105c..17612316e62 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultProducerCache.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.support.cache;
 
-import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -31,7 +30,6 @@ import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.StatefulService;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.SharedInternalProcessor;
@@ -40,10 +38,6 @@ import org.apache.camel.support.DefaultEndpointUtilizationStatistics;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.support.task.BlockingTask;
-import org.apache.camel.support.task.Tasks;
-import org.apache.camel.support.task.budget.Budgets;
-import org.apache.camel.support.task.budget.IterationBoundedBudget;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,9 +46,7 @@ import org.slf4j.LoggerFactory;
  * Default implementation of {@link ProducerCache}.
  */
 public class DefaultProducerCache extends ServiceSupport implements ProducerCache {
-
     private static final Logger LOG = LoggerFactory.getLogger(DefaultProducerCache.class);
-    private static final long ACQUIRE_WAIT_TIME = 30000;
 
     private final CamelContext camelContext;
     private final ProducerServicePool producers;
@@ -125,24 +117,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
         return source;
     }
 
-    private void waitForService(StatefulService service) {
-        BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
-                .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
-                .withMaxDuration(Duration.ofMillis(ACQUIRE_WAIT_TIME))
-                .withInterval(Duration.ofMillis(5))
-                .build())
-                .build();
-
-        if (!task.run(service::isStarting)) {
-            LOG.warn("The producer: {} did not finish starting in {} ms", service, ACQUIRE_WAIT_TIME);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service,
-                    service.getStatus());
-        }
-    }
-
     @Override
     public AsyncProducer acquireProducer(Endpoint endpoint) {
         try {
@@ -150,15 +124,6 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach
             if (statistics != null) {
                 statistics.onHit(endpoint.getEndpointUri());
             }
-
-            // if producer is starting then wait for it to be ready
-            if (producer instanceof StatefulService) {
-                StatefulService ss = (StatefulService) producer;
-                if (ss.isStarting()) {
-                    LOG.trace("Waiting for producer to finish starting: {}", producer);
-                    waitForService(ss);
-                }
-            }
             return producer;
         } catch (Throwable e) {
             throw new FailedToCreateProducerException(endpoint, e);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index fe0b6fe6cf9..6017d1cbb97 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support.cache;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -28,9 +29,14 @@ import java.util.function.Function;
 import org.apache.camel.Endpoint;
 import org.apache.camel.NonManagedService;
 import org.apache.camel.Service;
+import org.apache.camel.StatefulService;
 import org.apache.camel.support.LRUCache;
 import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
 import org.apache.camel.util.function.ThrowingFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,6 +130,24 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
         return s;
     }
 
+    private void waitForService(StatefulService service) {
+        BlockingTask task = Tasks.foregroundTask().withBudget(Budgets.iterationTimeBudget()
+                        .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+                        .withMaxDuration(Duration.ofMillis(30000))
+                        .withInterval(Duration.ofMillis(5))
+                        .build())
+                .build();
+
+        if (!task.run(service::isStarting)) {
+            LOG.warn("The producer: {} did not finish starting in {} ms", service, 30000);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Waited {} ms for producer to finish starting: {} state: {}", task.elapsed().toMillis(), service,
+                    service.getStatus());
+        }
+    }
+
     /**
      * Releases the producer/consumer back to the pool
      *
@@ -237,6 +261,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
                         S tempS = creator.apply(endpoint);
                         endpoint.getCamelContext().addService(tempS, true, true);
                         s = tempS;
+
+                        if (s instanceof StatefulService ss) {
+                            if (ss.isStarting()) {
+                                LOG.trace("Waiting for producer to finish starting: {}", s);
+                                waitForService(ss);
+                            }
+                        }
                     }
                 }
             }
@@ -349,6 +380,13 @@ abstract class ServicePool<S extends Service> extends ServiceSupport implements
                 if (s == null) {
                     s = creator.apply(endpoint);
                     s.start();
+
+                    if (s instanceof StatefulService ss) {
+                        if (ss.isStarting()) {
+                            LOG.trace("Waiting for producer to finish starting: {}", s);
+                            waitForService(ss);
+                        }
+                    }
                 }
             }
             return s;