You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/21 12:04:33 UTC
[camel] 04/11: CAMEL-14596: camel-core - Optimize Enrich/PollEnrich
when cacheSize = -1 to avoid creating endpoints. Introduced
getPrototypeEndpoint API.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 94910c6b9172fd12e509f3be4752334dd0c56fdc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Feb 21 10:18:22 2020 +0100
CAMEL-14596: camel-core - Optimize Enrich/PollEnrich when cacheSize = -1 to avoid creating endpoints. Introduced getPrototypeEndpoint API.
---
.../java/org/apache/camel/processor/Enricher.java | 55 +++++++++++--
.../org/apache/camel/processor/PollEnricher.java | 46 ++++++++++-
.../org/apache/camel/reifier/EnrichReifier.java | 3 +
.../apache/camel/processor/EnrichNoCacheTest.java | 91 ++++++++++++++++++++++
.../camel/processor/PollEnrichNoCacheTest.java | 85 ++++++++++++++++++++
.../org/apache/camel/support/ExchangeHelper.java | 6 ++
6 files changed, 276 insertions(+), 10 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index c038f98..d61550c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
@@ -176,10 +177,18 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
// use dynamic endpoint so calculate the endpoint to use
Object recipient = null;
+ boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
- endpoint = resolveEndpoint(exchange, recipient);
- // acquire the consumer from the cache
+ Endpoint existing = getExistingEndpoint(exchange, recipient);
+ if (existing == null) {
+ endpoint = resolveEndpoint(exchange, recipient, prototype);
+ } else {
+ endpoint = existing;
+ // we have an existing endpoint then its not a prototype scope
+ prototype = false;
+ }
+ // acquire the producer from the cache
producer = producerCache.acquireProducer(endpoint);
} catch (Throwable e) {
if (isIgnoreInvalidEndpoint()) {
@@ -203,10 +212,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
}
// record timing for sending the exchange using the producer
final StopWatch watch = sw;
+ final boolean prototypeEndpoint = prototype;
AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
boolean sync = ap.process(resourceExchange, new AsyncCallback() {
public void done(boolean doneSync) {
- // we only have to handle async completion of the routing slip
+ // we only have to handle async completion
if (doneSync) {
return;
}
@@ -249,6 +259,10 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
} catch (Exception e) {
// ignore
}
+ // and stop prototype endpoints
+ if (prototypeEndpoint) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
callback.done(false);
}
@@ -306,17 +320,46 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
} catch (Exception e) {
// ignore
}
+ // and stop prototype endpoints
+ if (prototypeEndpoint) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
callback.done(true);
return true;
}
- protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient instanceof String) {
+ recipient = ((String) recipient).trim();
+ }
+ if (recipient != null) {
+ // convert to a string type we can work with
+ String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ return exchange.getContext().hasEndpoint(uri);
+ }
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
// trim strings as end users might have added spaces between separators
if (recipient instanceof String) {
- recipient = ((String)recipient).trim();
+ recipient = ((String) recipient).trim();
+ } else if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient != null) {
+ // convert to a string type we can work with
+ recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+
+ if (recipient != null) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+ } else {
+ return null;
}
- return ExchangeHelper.resolveEndpoint(exchange, recipient);
}
/**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 44e8757..de1fea9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.PollingConsumer;
import org.apache.camel.impl.engine.DefaultConsumerCache;
import org.apache.camel.spi.ConsumerCache;
@@ -209,9 +210,17 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
// use dynamic endpoint so calculate the endpoint to use
Object recipient = null;
+ boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
- endpoint = resolveEndpoint(exchange, recipient);
+ Endpoint existing = getExistingEndpoint(exchange, recipient);
+ if (existing == null) {
+ endpoint = resolveEndpoint(exchange, recipient, prototype);
+ } else {
+ endpoint = existing;
+ // we have an existing endpoint then its not a prototype scope
+ prototype = false;
+ }
// acquire the consumer from the cache
consumer = consumerCache.acquirePollingConsumer(endpoint);
} catch (Throwable e) {
@@ -266,6 +275,10 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
} finally {
// return the consumer back to the cache
consumerCache.releasePollingConsumer(endpoint, consumer);
+ // and stop prototype endpoints
+ if (prototype) {
+ ServiceHelper.stopAndShutdownService(endpoint);
+ }
}
// remember current redelivery stats
@@ -332,12 +345,37 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
return true;
}
- protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient instanceof String) {
+ recipient = ((String) recipient).trim();
+ }
+ if (recipient != null) {
+ // convert to a string type we can work with
+ String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ return exchange.getContext().hasEndpoint(uri);
+ }
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
// trim strings as end users might have added spaces between separators
if (recipient instanceof String) {
- recipient = ((String)recipient).trim();
+ recipient = ((String) recipient).trim();
+ } else if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ } else if (recipient != null) {
+ // convert to a string type we can work with
+ recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+
+ if (recipient != null) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+ } else {
+ return null;
}
- return ExchangeHelper.resolveEndpoint(exchange, recipient);
}
/**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index e353f0c..109ffc1 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -41,6 +41,9 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
Enricher enricher = new Enricher(exp);
enricher.setShareUnitOfWork(isShareUnitOfWork);
enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
+ if (definition.getCacheSize() != null) {
+ enricher.setCacheSize(parseInt(definition.getCacheSize()));
+ }
AggregationStrategy strategy = createAggregationStrategy();
if (strategy != null) {
enricher.setAggregationStrategy(strategy);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
new file mode 100644
index 0000000..c37262a
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichNoCacheTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class EnrichNoCacheTest extends ContextTestSupport {
+
+ @Test
+ public void testNoCache() throws Exception {
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ sendBody("foo", "mock:x");
+ sendBody("foo", "mock:y");
+ sendBody("foo", "mock:z");
+ sendBody("bar", "mock:x");
+ sendBody("bar", "mock:y");
+ sendBody("bar", "mock:z");
+
+ // make sure its using an empty producer cache as the cache is disabled
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ Enricher ep = (Enricher) list.get(0);
+ assertNotNull(ep);
+ assertEquals(-1, ep.getCacheSize());
+
+ // check no additional endpoints added as cache was disabled
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ // now send again with mocks which then add endpoints
+
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ assertEquals(4, context.getEndpointRegistry().size());
+
+ sendBody("foo", "mock:x");
+ sendBody("foo", "mock:y");
+ sendBody("foo", "mock:z");
+ sendBody("bar", "mock:x");
+ sendBody("bar", "mock:y");
+ sendBody("bar", "mock:z");
+
+ // should not register as new endpoint so we keep at 4
+ sendBody("dummy", "mock:dummy");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, context.getEndpointRegistry().size());
+ }
+
+ protected void sendBody(String body, String uri) {
+ template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a")
+ .enrich().header("myHeader").cacheSize(-1).end().id("foo");
+ }
+ };
+
+ }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
new file mode 100644
index 0000000..3f323f2
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class PollEnrichNoCacheTest extends ContextTestSupport {
+
+ @Test
+ public void testNoCache() throws Exception {
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ sendBody("foo", "seda:x");
+ sendBody("foo", "seda:y");
+ sendBody("foo", "seda:z");
+ sendBody("bar", "seda:x");
+ sendBody("bar", "seda:y");
+ sendBody("bar", "seda:z");
+
+ // make sure its using an empty producer cache as the cache is disabled
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ PollEnricher ep = (PollEnricher) list.get(0);
+ assertNotNull(ep);
+ assertEquals(-1, ep.getCacheSize());
+
+ // check no additional endpoints added as cache was disabled
+ assertEquals(1, context.getEndpointRegistry().size());
+
+ // now send again and create endpoints
+ template.sendBody("seda:x", "x");
+ template.sendBody("seda:y", "y");
+ template.sendBody("seda:z", "z");
+
+ assertEquals(4, context.getEndpointRegistry().size());
+
+ sendBody("foo", "seda:x");
+ sendBody("foo", "seda:y");
+ sendBody("foo", "seda:z");
+ sendBody("bar", "seda:x");
+ sendBody("bar", "seda:y");
+ sendBody("bar", "seda:z");
+
+ // should not register as new endpoint so we keep at 4
+ sendBody("dummy", "seda:dummy");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, context.getEndpointRegistry().size());
+ }
+
+ protected void sendBody(String body, String uri) {
+ template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a")
+ .pollEnrich().header("myHeader").timeout(0).cacheSize(-1).end().id("foo");
+ }
+ };
+
+ }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index bfac356..e208448 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -89,6 +89,9 @@ public final class ExchangeHelper {
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+ if (value == null) {
+ throw new NoSuchEndpointException("null");
+ }
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;
@@ -110,6 +113,9 @@ public final class ExchangeHelper {
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+ if (value == null) {
+ throw new NoSuchEndpointException("null");
+ }
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;