You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:31 UTC

[camel] 02/11: CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1859881d17a420384037531a55f6b7623d8c1c16
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 04:56:57 2020 +0100

    CAMEL-14596: camel-core - Optimize Recipient List when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../org/apache/camel/processor/RecipientList.java  |  1 +
 .../camel/processor/RecipientListProcessor.java    | 61 +++++++++++++++++++---
 .../camel/processor/RecipientListNoCacheTest.java  | 33 ++++++++++++
 3 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index 4903429..f473dfc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -193,6 +193,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
                 isStopOnAggregateException());
         rlp.setAggregateExecutorService(aggregateExecutorService);
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+        rlp.setCacheSize(getCacheSize());
         rlp.setId(getId());
         rlp.setRouteId(getRouteId());
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index fb5b7c9..3eb6352 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,6 +32,7 @@ import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.ProducerCache;
@@ -66,6 +67,7 @@ public class RecipientListProcessor extends MulticastProcessor {
     private final Iterator<?> iter;
     private boolean ignoreInvalidEndpoints;
     private ProducerCache producerCache;
+    private int cacheSize;
 
     /**
      * Class that represent each step in the recipient list to do
@@ -82,9 +84,10 @@ public class RecipientListProcessor extends MulticastProcessor {
         private final ProducerCache producerCache;
         private final ExchangePattern pattern;
         private volatile ExchangePattern originalPattern;
+        private final boolean prototypeEndpoint;
 
         private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
-                                               Processor prepared, Exchange exchange, ExchangePattern pattern) {
+                                               Processor prepared, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
             this.index = index;
             this.producerCache = producerCache;
             this.endpoint = endpoint;
@@ -92,6 +95,7 @@ public class RecipientListProcessor extends MulticastProcessor {
             this.prepared = prepared;
             this.exchange = exchange;
             this.pattern = pattern;
+            this.prototypeEndpoint = prototypeEndpoint;
         }
 
         @Override
@@ -139,6 +143,10 @@ public class RecipientListProcessor extends MulticastProcessor {
                 }
                 // when we are done we should release back in pool
                 producerCache.releaseProducer(endpoint, producer);
+                // and stop prototype endpoints
+                if (prototypeEndpoint) {
+                    ServiceHelper.stopAndShutdownService(endpoint);
+                }
             } catch (Exception e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
@@ -148,6 +156,8 @@ public class RecipientListProcessor extends MulticastProcessor {
 
     }
 
+    // TODO: camel-bean @RecipientList cacheSize
+
     public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter) {
         super(camelContext, null);
         this.producerCache = producerCache;
@@ -169,6 +179,14 @@ public class RecipientListProcessor extends MulticastProcessor {
         this.iter = iter;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     public boolean isIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
@@ -185,12 +203,21 @@ public class RecipientListProcessor extends MulticastProcessor {
         // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
         int index = 0;
         while (iter.hasNext()) {
+            boolean prototype = cacheSize < 0;
+
             Object recipient = iter.next();
             Endpoint endpoint;
             Producer producer;
             ExchangePattern pattern;
             try {
-                endpoint = resolveEndpoint(exchange, recipient);
+                Endpoint existing = getExistingEndpoint(exchange, recipient);
+                if (existing == null) {
+                    endpoint = resolveEndpoint(exchange, recipient, prototype);
+                } else {
+                    endpoint = existing;
+                    // we have an existing endpoint then its not a prototype scope
+                    prototype = false;
+                }
                 pattern = resolveExchangePattern(recipient);
                 producer = producerCache.acquireProducer(endpoint);
             } catch (Exception e) {
@@ -206,7 +233,7 @@ public class RecipientListProcessor extends MulticastProcessor {
             }
 
             // then create the exchange pair
-            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern));
+            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype));
         }
 
         return result;
@@ -215,7 +242,8 @@ public class RecipientListProcessor extends MulticastProcessor {
     /**
      * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
      */
-    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) {
+    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer,
+                                                                Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
         Processor prepared = producer;
 
         // copy exchange, and do not share the unit of work
@@ -243,7 +271,7 @@ public class RecipientListProcessor extends MulticastProcessor {
         }
 
         // and create the pair
-        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern);
+        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern, prototypeEndpoint);
     }
 
     @Override
@@ -263,12 +291,31 @@ public class RecipientListProcessor extends MulticastProcessor {
         return answer;
     }
 
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
             recipient = ((String) recipient).trim();
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
+        }
     }
 
     protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index f4a9575..f396c42 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -59,6 +59,39 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         assertNull(pc);
     }
 
+    @Test
+    public void testNoCacheTwo() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo");
+        sendBody("bar");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+        assertEquals(-1, rl.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
     protected void sendBody(String body) {
         template.sendBodyAndHeader("direct:a", body, "recipientListHeader", "mock:x,mock:y,mock:z");
     }