You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:33 UTC

[camel] 04/11: CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 94910c6b9172fd12e509f3be4752334dd0c56fdc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:18:22 2020 +0100

    CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
 .../java/org/apache/camel/processor/Enricher.java  | 55 +++++++++++--
 .../org/apache/camel/processor/PollEnricher.java   | 46 ++++++++++-
 .../org/apache/camel/reifier/EnrichReifier.java    |  3 +
 .../apache/camel/processor/EnrichNoCacheTest.java  | 91 ++++++++++++++++++++++
 .../camel/processor/PollEnrichNoCacheTest.java     | 85 ++++++++++++++++++++
 .../org/apache/camel/support/ExchangeHelper.java   |  6 ++
 6 files changed, 276 insertions(+), 10 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index c038f98..d61550c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.impl.engine.DefaultProducerCache;
 import org.apache.camel.impl.engine.EmptyProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
@@ -176,10 +177,18 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
 
         // use dynamic endpoint so calculate the endpoint to use
         Object recipient = null;
+        boolean prototype = cacheSize < 0;
         try {
             recipient = expression.evaluate(exchange, Object.class);
-            endpoint = resolveEndpoint(exchange, recipient);
-            // acquire the consumer from the cache
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
+            // acquire the producer from the cache
             producer = producerCache.acquireProducer(endpoint);
         } catch (Throwable e) {
             if (isIgnoreInvalidEndpoint()) {
@@ -203,10 +212,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         }
         // record timing for sending the exchange using the producer
         final StopWatch watch = sw;
+        final boolean prototypeEndpoint = prototype;
         AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
         boolean sync = ap.process(resourceExchange, new AsyncCallback() {
             public void done(boolean doneSync) {
-                // we only have to handle async completion of the routing slip
+                // we only have to handle async completion
                 if (doneSync) {
                     return;
                 }
@@ -249,6 +259,10 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
                 } catch (Exception e) {
                     // ignore
                 }
+                // and stop prototype endpoints
+                if (prototypeEndpoint) {
+                    ServiceHelper.stopAndShutdownService(endpoint);
+                }
 
                 callback.done(false);
             }
@@ -306,17 +320,46 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         } catch (Exception e) {
             // ignore
         }
+        // and stop prototype endpoints
+        if (prototypeEndpoint) {
+            ServiceHelper.stopAndShutdownService(endpoint);
+        }
 
         callback.done(true);
         return true;
     }
 
-    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
-            recipient = ((String)recipient).trim();
+            recipient = ((String) recipient).trim();
+        } else if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient != null) {
+            // convert to a string type we can work with
+            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+        }
+
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 44e8757..de1fea9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.engine.DefaultConsumerCache;
 import org.apache.camel.spi.ConsumerCache;
@@ -209,9 +210,17 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
 
         // use dynamic endpoint so calculate the endpoint to use
         Object recipient = null;
+        boolean prototype = cacheSize < 0;
         try {
             recipient = expression.evaluate(exchange, Object.class);
-            endpoint = resolveEndpoint(exchange, recipient);
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
             // acquire the consumer from the cache
             consumer = consumerCache.acquirePollingConsumer(endpoint);
         } catch (Throwable e) {
@@ -266,6 +275,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         } finally {
             // return the consumer back to the cache
             consumerCache.releasePollingConsumer(endpoint, consumer);
+            // and stop prototype endpoints
+            if (prototype) {
+                ServiceHelper.stopAndShutdownService(endpoint);
+            }
         }
 
         // remember current redelivery stats
@@ -332,12 +345,37 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         return true;
     }
 
-    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        if (recipient != null) {
+            // convert to a string type we can work with
+            String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+            return exchange.getContext().hasEndpoint(uri);
+        }
+        return null;
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
         // trim strings as end users might have added spaces between separators
         if (recipient instanceof String) {
-            recipient = ((String)recipient).trim();
+            recipient = ((String) recipient).trim();
+        } else if (recipient instanceof Endpoint) {
+            return (Endpoint) recipient;
+        } else if (recipient != null) {
+            // convert to a string type we can work with
+            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+        }
+
+        if (recipient != null) {
+            return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+        } else {
+            return null;
         }
-        return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 
     /**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index e353f0c..109ffc1 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -41,6 +41,9 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
         Enricher enricher = new Enricher(exp);
         enricher.setShareUnitOfWork(isShareUnitOfWork);
         enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
+        if (definition.getCacheSize() != null) {
+            enricher.setCacheSize(parseInt(definition.getCacheSize()));
+        }
         AggregationStrategy strategy = createAggregationStrategy();
         if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
new file mode 100644
index 0000000..c37262a
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class EnrichNoCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        Enricher ep = (Enricher) list.get(0);
+        assertNotNull(ep);
+        assertEquals(-1, ep.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again with mocks which then add endpoints
+
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "mock:x");
+        sendBody("foo", "mock:y");
+        sendBody("foo", "mock:z");
+        sendBody("bar", "mock:x");
+        sendBody("bar", "mock:y");
+        sendBody("bar", "mock:z");
+
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "mock:dummy");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a")
+                        .enrich().header("myHeader").cacheSize(-1).end().id("foo");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
new file mode 100644
index 0000000..3f323f2
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class PollEnrichNoCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testNoCache() throws Exception {
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        sendBody("foo", "seda:x");
+        sendBody("foo", "seda:y");
+        sendBody("foo", "seda:z");
+        sendBody("bar", "seda:x");
+        sendBody("bar", "seda:y");
+        sendBody("bar", "seda:z");
+
+        // make sure its using an empty producer cache as the cache is disabled
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        PollEnricher ep = (PollEnricher) list.get(0);
+        assertNotNull(ep);
+        assertEquals(-1, ep.getCacheSize());
+
+        // check no additional endpoints added as cache was disabled
+        assertEquals(1, context.getEndpointRegistry().size());
+
+        // now send again and create endpoints
+        template.sendBody("seda:x", "x");
+        template.sendBody("seda:y", "y");
+        template.sendBody("seda:z", "z");
+
+        assertEquals(4, context.getEndpointRegistry().size());
+
+        sendBody("foo", "seda:x");
+        sendBody("foo", "seda:y");
+        sendBody("foo", "seda:z");
+        sendBody("bar", "seda:x");
+        sendBody("bar", "seda:y");
+        sendBody("bar", "seda:z");
+
+        // should not register as new endpoint so we keep at 4
+        sendBody("dummy", "seda:dummy");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, context.getEndpointRegistry().size());
+    }
+
+    protected void sendBody(String body, String uri) {
+        template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a")
+                        .pollEnrich().header("myHeader").timeout(0).cacheSize(-1).end().id("foo");
+            }
+        };
+
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index bfac356..e208448 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -89,6 +89,9 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        if (value == null) {
+            throw new NoSuchEndpointException("null");
+        }
         Endpoint endpoint;
         if (value instanceof Endpoint) {
             endpoint = (Endpoint) value;
@@ -110,6 +113,9 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        if (value == null) {
+            throw new NoSuchEndpointException("null");
+        }
         Endpoint endpoint;
         if (value instanceof Endpoint) {
             endpoint = (Endpoint) value;