You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/03/02 18:34:02 UTC
[camel] branch master updated: CAMEL-14644: camel-core - Optimize
dynamic EIPs to only normalize uri once
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 83fa854 CAMEL-14644: camel-core - Optimize dynamic EIPs to only normalize uri once
83fa854 is described below
commit 83fa854f634b99d9e15e4ba39571f4ce63e5cee1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 2 14:20:27 2020 +0100
CAMEL-14644: camel-core - Optimize dynamic EIPs to only normalize uri once
---
.../main/java/org/apache/camel/CamelContext.java | 8 ++-
.../org/apache/camel/ExtendedCamelContext.java | 45 +++++++++++++++
.../apache/camel/spi/NormalizedEndpointUri.java} | 29 +++-------
.../camel/impl/engine/AbstractCamelContext.java | 57 +++++++++++++++---
.../org/apache/camel/impl/engine/EndpointKey.java | 8 ++-
.../java/org/apache/camel/processor/Enricher.java | 51 ++++++++++------
.../org/apache/camel/processor/PollEnricher.java | 51 ++++++++++------
.../camel/processor/RecipientListProcessor.java | 64 ++++++++++++++-------
.../org/apache/camel/processor/RoutingSlip.java | 42 +++++++++++---
.../camel/processor/SendDynamicProcessor.java | 67 +++++++++++++---------
.../org/apache/camel/processor/SendProcessor.java | 8 +--
.../camel/impl/engine/DefaultCamelContextTest.java | 4 +-
.../apache/camel/support/CamelContextHelper.java | 31 ++++++++++
.../org/apache/camel/support/EndpointHelper.java | 19 +++---
.../org/apache/camel/support/ExchangeHelper.java | 7 +++
.../org/apache/camel/support/NormalizedUri.java} | 29 ++++------
16 files changed, 359 insertions(+), 161 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 62e5c50..cc63828 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel;
-import java.util.*;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.camel.spi.CamelContextNameStrategy;
import org.apache.camel.spi.ClassResolver;
@@ -434,8 +438,6 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
*
* @param uri the URI of the endpoint
* @return the endpoint
- *
- * @see #getPrototypeEndpoint(String)
*/
Endpoint getEndpoint(String uri);
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 970d466..60c5458 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -46,6 +46,7 @@ import org.apache.camel.spi.ManagementMBeanAssembler;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ModelToXMLDumper;
import org.apache.camel.spi.NodeIdFactory;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
import org.apache.camel.spi.ProcessorFactory;
@@ -132,6 +133,50 @@ public interface ExtendedCamelContext extends CamelContext {
Endpoint getPrototypeEndpoint(String uri);
/**
+ * Resolves the given name to an {@link Endpoint} of the specified type (scope is prototype).
+ * If the name has a singleton endpoint registered, then the singleton is returned.
+ * Otherwise, a new {@link Endpoint} is created.
+ *
+ * The endpoint is NOT registered in the {@link org.apache.camel.spi.EndpointRegistry} as its prototype scoped,
+ * and therefore expected to be short lived and discarded after use (you must stop and shutdown the
+ * endpoint when no longer in use).
+ *
+ * @param uri the URI of the endpoint
+ * @return the endpoint
+ *
+ * @see #getEndpoint(String)
+ */
+ Endpoint getPrototypeEndpoint(NormalizedEndpointUri uri);
+
+ /**
+ * Is the given endpoint already registered in the {@link org.apache.camel.spi.EndpointRegistry}
+ *
+ * @param uri the URI of the endpoint
+ * @return the registered endpoint or <tt>null</tt> if not registered
+ */
+ Endpoint hasEndpoint(NormalizedEndpointUri uri);
+
+ /**
+ * Resolves the given name to an {@link Endpoint} of the specified type.
+ * If the name has a singleton endpoint registered, then the singleton is returned.
+ * Otherwise, a new {@link Endpoint} is created and registered in the {@link org.apache.camel.spi.EndpointRegistry}.
+ *
+ * @param uri the URI of the endpoint
+ * @return the endpoint
+ *
+ * @see #getPrototypeEndpoint(String)
+ */
+ Endpoint getEndpoint(NormalizedEndpointUri uri);
+
+ /**
+ * Normalizes the given uri.
+ *
+ * @param uri the uri
+ * @return a normalized uri
+ */
+ NormalizedEndpointUri normalizeUri(String uri);
+
+ /**
* Returns the order in which the route inputs was started.
* <p/>
* The order may not be according to the startupOrder defined on the route.
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java
similarity index 55%
copy from core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
copy to core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java
index 8981d44..99cae35 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/NormalizedEndpointUri.java
@@ -14,32 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl.engine;
-
-import org.apache.camel.ValueHolder;
-import org.apache.camel.util.StringHelper;
+package org.apache.camel.spi;
/**
- * Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext},
- * to ensure a consistent lookup.
+ * An Uri which has been normalized.
+ * <p/>
+ * This is intended for internal optimizations or third party EIP or component implementations
+ * that can be optimized to pre normalize endpoints under certain use-cases.
*/
-public final class EndpointKey extends ValueHolder<String> {
-
- public EndpointKey(String uri) {
- this(uri, false);
- }
+public interface NormalizedEndpointUri {
/**
- * Optimized when the uri is already normalized.
+ * Gets the normalized uri
*/
- public EndpointKey(String uri, boolean normalized) {
- super(normalized ? uri : AbstractCamelContext.normalizeEndpointUri(uri));
- StringHelper.notEmpty(uri, "uri");
- }
-
- @Override
- public String toString() {
- return get();
- }
+ String getUri();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index bd95fb1..1094d8f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -119,6 +119,7 @@ import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ModelToXMLDumper;
import org.apache.camel.spi.NodeIdFactory;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
import org.apache.camel.spi.ProcessorFactory;
@@ -149,6 +150,7 @@ import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.LRUCacheFactory;
+import org.apache.camel.support.NormalizedUri;
import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.ProcessorEndpoint;
import org.apache.camel.support.jsse.SSLContextParameters;
@@ -682,6 +684,20 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
}
@Override
+ public Endpoint hasEndpoint(NormalizedEndpointUri uri) {
+ if (endpoints.isEmpty()) {
+ return null;
+ }
+ EndpointKey key;
+ if (uri instanceof EndpointKey) {
+ key = (EndpointKey) uri;
+ } else {
+ key = new EndpointKey(uri.getUri(), true);
+ }
+ return endpoints.get(key);
+ }
+
+ @Override
public Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception {
Endpoint oldEndpoint;
@@ -736,16 +752,37 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
}
@Override
+ public NormalizedEndpointUri normalizeUri(String uri) {
+ try {
+ uri = resolvePropertyPlaceholders(uri);
+ uri = normalizeEndpointUri(uri);
+ return new NormalizedUri(uri);
+ } catch (Exception e) {
+ throw new ResolveEndpointFailedException(uri, e);
+ }
+ }
+
+ @Override
public Endpoint getEndpoint(String uri) {
- return doGetEndpoint(uri, false);
+ return doGetEndpoint(uri, false, false);
+ }
+
+ @Override
+ public Endpoint getEndpoint(NormalizedEndpointUri uri) {
+ return doGetEndpoint(uri.getUri(), true, false);
}
@Override
public Endpoint getPrototypeEndpoint(String uri) {
- return doGetEndpoint(uri, true);
+ return doGetEndpoint(uri, false, true);
+ }
+
+ @Override
+ public Endpoint getPrototypeEndpoint(NormalizedEndpointUri uri) {
+ return doGetEndpoint(uri.getUri(), true, true);
}
- protected Endpoint doGetEndpoint(String uri, boolean prototype) {
+ protected Endpoint doGetEndpoint(String uri, boolean normalized, boolean prototype) {
// ensure CamelContext are initialized before we can get an endpoint
init();
@@ -755,17 +792,21 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext
// in case path has property placeholders then try to let property
// component resolve those
- try {
- uri = resolvePropertyPlaceholders(uri);
- } catch (Exception e) {
- throw new ResolveEndpointFailedException(uri, e);
+ if (!normalized) {
+ try {
+ uri = resolvePropertyPlaceholders(uri);
+ } catch (Exception e) {
+ throw new ResolveEndpointFailedException(uri, e);
+ }
}
final String rawUri = uri;
// normalize uri so we can do endpoint hits with minor mistakes and
// parameters is not in the same order
- uri = normalizeEndpointUri(uri);
+ if (!normalized) {
+ uri = normalizeEndpointUri(uri);
+ }
LOG.trace("Getting endpoint with raw uri: {}, normalized uri: {}", rawUri, uri);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
index 8981d44..46be51c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
@@ -17,13 +17,14 @@
package org.apache.camel.impl.engine;
import org.apache.camel.ValueHolder;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.util.StringHelper;
/**
* Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext},
* to ensure a consistent lookup.
*/
-public final class EndpointKey extends ValueHolder<String> {
+public final class EndpointKey extends ValueHolder<String> implements NormalizedEndpointUri {
public EndpointKey(String uri) {
this(uri, false);
@@ -38,6 +39,11 @@ public final class EndpointKey extends ValueHolder<String> {
}
@Override
+ public String getUri() {
+ return get();
+ }
+
+ @Override
public String toString() {
return get();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index d61550c..d29a42b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -27,12 +27,14 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -180,6 +182,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
+ recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
@@ -329,37 +332,47 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
return true;
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof Endpoint) {
- return (Endpoint) recipient;
+ protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
+ return recipient;
} else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
- // convert to a string type we can work with
- String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
- return exchange.getContext().hasEndpoint(uri);
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ String uri;
+ if (recipient instanceof String) {
+ uri = (String) recipient;
+ } else {
+ // convert to a string type we can work with
+ uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+ // optimize and normalize endpoint
+ return ecc.normalizeUri(uri);
}
return null;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof String) {
- recipient = ((String) recipient).trim();
- } else if (recipient instanceof Endpoint) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
- } else if (recipient != null) {
- // convert to a string type we can work with
- recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
}
-
if (recipient != null) {
- return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
- } else {
- return null;
+ if (recipient instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ return ecc.hasEndpoint(nu);
+ } else {
+ String uri = recipient.toString();
+ return exchange.getContext().hasEndpoint(uri);
+ }
}
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
}
/**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index de1fea9..a608b90 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -25,6 +25,7 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.PollingConsumer;
@@ -33,6 +34,7 @@ import org.apache.camel.spi.ConsumerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
@@ -213,6 +215,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
+ recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
@@ -345,37 +348,47 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
return true;
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof Endpoint) {
- return (Endpoint) recipient;
+ protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
+ return recipient;
} else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
- // convert to a string type we can work with
- String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
- return exchange.getContext().hasEndpoint(uri);
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ String uri;
+ if (recipient instanceof String) {
+ uri = (String) recipient;
+ } else {
+ // convert to a string type we can work with
+ uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+ // optimize and normalize endpoint
+ return ecc.normalizeUri(uri);
}
return null;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof String) {
- recipient = ((String) recipient).trim();
- } else if (recipient instanceof Endpoint) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
- } else if (recipient != null) {
- // convert to a string type we can work with
- recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
}
-
if (recipient != null) {
- return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
- } else {
- return null;
+ if (recipient instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ return ecc.hasEndpoint(nu);
+ } else {
+ String uri = recipient.toString();
+ return exchange.getContext().hasEndpoint(uri);
+ }
}
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
}
/**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 3eb6352..66a22d3 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -31,10 +31,12 @@ import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -210,6 +212,7 @@ public class RecipientListProcessor extends MulticastProcessor {
Producer producer;
ExchangePattern pattern;
try {
+ recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
@@ -291,41 +294,62 @@ public class RecipientListProcessor extends MulticastProcessor {
return answer;
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof Endpoint) {
- return (Endpoint) recipient;
+ protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
+ return recipient;
} else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
- // convert to a string type we can work with
- String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
- return exchange.getContext().hasEndpoint(uri);
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ String uri;
+ if (recipient instanceof String) {
+ uri = (String) recipient;
+ } else {
+ // convert to a string type we can work with
+ uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+ // optimize and normalize endpoint
+ return ecc.normalizeUri(uri);
}
return null;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof String) {
- recipient = ((String) recipient).trim();
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
}
if (recipient != null) {
- return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
- } else {
- return null;
+ if (recipient instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ return ecc.hasEndpoint(nu);
+ } else {
+ String uri = recipient.toString().trim();
+ return exchange.getContext().hasEndpoint(uri);
+ }
}
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
}
- protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof String) {
- String s = ((String) recipient).trim();
- // see if exchangePattern is a parameter in the url
- s = URISupport.normalizeUri(s);
+ protected ExchangePattern resolveExchangePattern(Object recipient) {
+ String s = null;
+
+ if (recipient instanceof NormalizedEndpointUri) {
+ s = ((NormalizedEndpointUri) recipient).getUri();
+ } else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
+ s = ((String) recipient).trim();
+ }
+ if (s != null) {
return EndpointHelper.resolveExchangePatternFromUrl(s);
}
+
return null;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index e34480a..d88d9c7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
@@ -32,6 +33,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RouteIdAware;
@@ -246,6 +248,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
Endpoint endpoint;
try {
Object recipient = iter.next(exchange);
+ recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
@@ -315,17 +318,41 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
return true;
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof Endpoint) {
- return (Endpoint) recipient;
+ protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
+ return recipient;
} else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
- // convert to a string type we can work with
- String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
- return exchange.getContext().hasEndpoint(uri);
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ String uri;
+ if (recipient instanceof String) {
+ uri = (String) recipient;
+ } else {
+ // convert to a string type we can work with
+ uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+ // optimize and normalize endpoint
+ return ecc.normalizeUri(uri);
+ }
+ return null;
+ }
+
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ if (recipient instanceof Endpoint) {
+ return (Endpoint) recipient;
+ }
+ if (recipient != null) {
+ if (recipient instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ return ecc.hasEndpoint(nu);
+ } else {
+ String uri = recipient.toString();
+ return exchange.getContext().hasEndpoint(uri);
+ }
}
return null;
}
@@ -448,6 +475,7 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
boolean prototype = cacheSize < 0;
try {
Object recipient = iter.next(ex);
+ recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
nextEndpoint = resolveEndpoint(exchange, recipient, prototype);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 11fa8ef..61e8d7c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.ResolveEndpointFailedException;
@@ -30,6 +31,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.SendDynamicAware;
@@ -136,6 +138,15 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
}
Object targetRecipient = staticUri != null ? staticUri : recipient;
+ targetRecipient = prepareRecipient(exchange, targetRecipient);
+ if (targetRecipient == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint");
+ }
+ // no endpoint to send to, so ignore
+ callback.done(true);
+ return true;
+ }
Endpoint existing = getExistingEndpoint(exchange, targetRecipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, targetRecipient, prototype);
@@ -144,14 +155,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
// we have an existing endpoint then its not a prototype scope
prototype = false;
}
- if (endpoint == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint");
- }
- // no endpoint to send to, so ignore
- callback.done(true);
- return true;
- }
destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri());
} catch (Throwable e) {
if (isIgnoreInvalidEndpoint()) {
@@ -238,37 +241,47 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
return ExchangeHelper.resolveScheme(uri);
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof Endpoint) {
- return (Endpoint) recipient;
+ protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
+ if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
+ return recipient;
} else if (recipient instanceof String) {
+ // trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
- // convert to a string type we can work with
- String uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
- return exchange.getContext().hasEndpoint(uri);
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ String uri;
+ if (recipient instanceof String) {
+ uri = (String) recipient;
+ } else {
+ // convert to a string type we can work with
+ uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
+ }
+ // optimize and normalize endpoint
+ return ecc.normalizeUri(uri);
}
return null;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws NoTypeConversionAvailableException {
- // trim strings as end users might have added spaces between separators
- if (recipient instanceof String) {
- recipient = ((String) recipient).trim();
- } else if (recipient instanceof Endpoint) {
+ protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
- } else if (recipient != null) {
- // convert to a string type we can work with
- recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
}
-
if (recipient != null) {
- return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
- } else {
- return null;
+ if (recipient instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ return ecc.hasEndpoint(nu);
+ } else {
+ String uri = recipient.toString();
+ return exchange.getContext().hasEndpoint(uri);
+ }
}
+ return null;
+ }
+
+ protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
}
protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, Endpoint endpoint) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
index 9c5d26e..1abbc08 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -71,12 +71,8 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
this.destination = destination;
this.camelContext = (ExtendedCamelContext) destination.getCamelContext();
this.pattern = pattern;
- try {
- this.destinationExchangePattern = null;
- this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri());
- } catch (URISyntaxException e) {
- throw RuntimeCamelException.wrapRuntimeCamelException(e);
- }
+ this.destinationExchangePattern = null;
+ this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri());
ObjectHelper.notNull(this.camelContext, "camelContext");
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java
index 9a9c5a4..01a29db 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultCamelContextTest.java
@@ -119,7 +119,7 @@ public class DefaultCamelContextTest extends TestSupport {
assertNotNull(endpoint);
try {
- ctx.getEndpoint(null);
+ ctx.getEndpoint((String) null);
fail("Should have thrown exception");
} catch (IllegalArgumentException e) {
// expected
@@ -280,7 +280,7 @@ public class DefaultCamelContextTest extends TestSupport {
assertEquals(1, map.size());
try {
- ctx.hasEndpoint(null);
+ ctx.hasEndpoint((String) null);
fail("Should have thrown exception");
} catch (ResolveEndpointFailedException e) {
// expected
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
index 1983d9a..08f1b46 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java
@@ -30,6 +30,7 @@ import org.apache.camel.NamedNode;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.util.ObjectHelper;
@@ -63,6 +64,21 @@ public final class CamelContextHelper {
}
/**
+ * Returns the mandatory endpoint for the given URI or the
+ * {@link org.apache.camel.NoSuchEndpointException} is thrown
+ */
+ public static Endpoint getMandatoryEndpoint(CamelContext camelContext, NormalizedEndpointUri uri)
+ throws NoSuchEndpointException {
+ ExtendedCamelContext ecc = (ExtendedCamelContext) camelContext;
+ Endpoint endpoint = ecc.getEndpoint(uri);
+ if (endpoint == null) {
+ throw new NoSuchEndpointException(uri.getUri());
+ } else {
+ return endpoint;
+ }
+ }
+
+ /**
* Returns the mandatory endpoint (prototype scope) for the given URI or the
* {@link org.apache.camel.NoSuchEndpointException} is thrown
*/
@@ -78,6 +94,21 @@ public final class CamelContextHelper {
}
/**
+ * Returns the mandatory endpoint (prototype scope) for the given URI or the
+ * {@link org.apache.camel.NoSuchEndpointException} is thrown
+ */
+ public static Endpoint getMandatoryPrototypeEndpoint(CamelContext camelContext, NormalizedEndpointUri uri)
+ throws NoSuchEndpointException {
+ ExtendedCamelContext ecc = (ExtendedCamelContext) camelContext;
+ Endpoint endpoint = ecc.getPrototypeEndpoint(uri);
+ if (endpoint == null) {
+ throw new NoSuchEndpointException(uri.getUri());
+ } else {
+ return endpoint;
+ }
+ }
+
+ /**
* Returns the mandatory endpoint for the given URI and type or the
* {@link org.apache.camel.NoSuchEndpointException} is thrown
*/
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java
index 675cd85..c41edd7 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EndpointHelper.java
@@ -40,6 +40,7 @@ import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import static org.apache.camel.util.StringHelper.after;
/**
@@ -378,17 +379,15 @@ public final class EndpointHelper {
*
* @param url the url
* @return the exchange pattern, or <tt>null</tt> if the url has no <tt>exchangePattern</tt> configured.
- * @throws URISyntaxException is thrown if uri is invalid
*/
- public static ExchangePattern resolveExchangePatternFromUrl(String url) throws URISyntaxException {
- int idx = url.indexOf("?");
- if (idx > 0) {
- url = url.substring(idx + 1);
- }
- Map<String, Object> parameters = URISupport.parseQuery(url, true);
- String pattern = (String) parameters.get("exchangePattern");
- if (pattern != null) {
- return ExchangePattern.asEnum(pattern);
+ public static ExchangePattern resolveExchangePatternFromUrl(String url) {
+ // optimize to use simple string contains check
+ if (url.contains("exchangePattern=InOnly")) {
+ return ExchangePattern.InOnly;
+ } else if (url.contains("exchangePattern=InOut")) {
+ return ExchangePattern.InOut;
+ } else if (url.contains("exchangePattern=InOptionalOut")) {
+ return ExchangePattern.InOptionalOut;
}
return null;
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index e208448..87b7655 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -49,6 +49,7 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.TypeConverter;
import org.apache.camel.WrappedFile;
+import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.IOHelper;
@@ -95,6 +96,9 @@ public final class ExchangeHelper {
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;
+ } else if (value instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
+ endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), nu);
} else {
String uri = value.toString().trim();
endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
@@ -119,6 +123,9 @@ public final class ExchangeHelper {
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;
+ } else if (value instanceof NormalizedEndpointUri) {
+ NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
+ endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), nu);
} else {
String uri = value.toString().trim();
endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), uri);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java b/core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java
similarity index 57%
copy from core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
copy to core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java
index 8981d44..e774a26 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EndpointKey.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/NormalizedUri.java
@@ -14,32 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.impl.engine;
+package org.apache.camel.support;
-import org.apache.camel.ValueHolder;
-import org.apache.camel.util.StringHelper;
+import org.apache.camel.spi.NormalizedEndpointUri;
-/**
- * Key used in {@link DefaultEndpointRegistry} in {@link AbstractCamelContext},
- * to ensure a consistent lookup.
- */
-public final class EndpointKey extends ValueHolder<String> {
+public final class NormalizedUri implements NormalizedEndpointUri {
+
+ private final String uri;
- public EndpointKey(String uri) {
- this(uri, false);
+ public NormalizedUri(String uri) {
+ this.uri = uri;
}
- /**
- * Optimized when the uri is already normalized.
- */
- public EndpointKey(String uri, boolean normalized) {
- super(normalized ? uri : AbstractCamelContext.normalizeEndpointUri(uri));
- StringHelper.notEmpty(uri, "uri");
+ @Override
+ public String getUri() {
+ return uri;
}
@Override
public String toString() {
- return get();
+ return uri;
}
-
}