You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/13 14:37:52 UTC
[01/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic
uris.
Repository: camel
Updated Branches:
refs/heads/master 71b43bc83 -> fe5960aef
CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9fd4d549
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9fd4d549
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9fd4d549
Branch: refs/heads/master
Commit: 9fd4d549056c12754c0fa76e253e00580e8ceb7a
Parents: 71b43bc
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 12 20:54:52 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 08:14:47 2015 +0200
----------------------------------------------------------------------
.../org/apache/camel/impl/ConsumerCache.java | 1 -
.../camel/model/PollEnrichDefinition.java | 62 ++++++++++---
.../apache/camel/model/ProcessorDefinition.java | 31 +++++++
.../apache/camel/processor/PollEnricher.java | 93 +++++++++++++++++---
.../apache/camel/processor/SendProcessor.java | 1 -
.../enricher/PollEnrichExpressionTest.java | 47 ++++++++++
6 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
index 6f60f46..d957efe 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
@@ -58,7 +58,6 @@ public class ConsumerCache extends ServiceSupport {
this(source, camelContext, cache, camelContext.getPollingConsumerServicePool());
}
-
public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) {
this.camelContext = camelContext;
this.consumers = cache;
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 7b6f9d1..a5d7cb1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,12 +19,16 @@ package org.apache.camel.model;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
+import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -41,6 +45,8 @@ import org.apache.camel.util.ObjectHelper;
@XmlRootElement(name = "pollEnrich")
@XmlAccessorType(XmlAccessType.FIELD)
public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
+ @XmlElementRef
+ private ExpressionDefinition expression;
@XmlAttribute(name = "uri")
private String resourceUri;
// TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
@@ -94,24 +100,33 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) {
- throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint");
+ if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
+ throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
}
// lookup endpoint
- Endpoint endpoint;
+ PollingConsumer consumer = null;
if (resourceUri != null) {
- endpoint = routeContext.resolveEndpoint(resourceUri);
- } else {
- endpoint = routeContext.resolveEndpoint(null, resourceRef);
+ Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
+ consumer = endpoint.createPollingConsumer();
+ } else if (resourceRef != null) {
+ Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
+ consumer = endpoint.createPollingConsumer();
}
+ // if no timeout then we should block, and there use a negative timeout
+ long time = timeout != null ? timeout : -1;
+
+ // create the expression if any was configured
+ Expression exp = createResourceExpression(routeContext);
+
PollEnricher enricher;
- if (timeout != null) {
- enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout);
+ if (exp != null) {
+ enricher = new PollEnricher(null, exp, time);
+ } else if (consumer != null) {
+ enricher = new PollEnricher(null, consumer, time);
} else {
- // if no timeout then we should block, and there use a negative timeout
- enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1);
+ throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
}
AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -152,6 +167,20 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return strategy;
}
+ /**
+ * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
+ *
+ * @param routeContext the route context
+ * @return the created expression, or <tt>null</tt> if no expression configured
+ */
+ protected Expression createResourceExpression(RouteContext routeContext) {
+ if (expression != null) {
+ return expression.createExpression(routeContext);
+ } else {
+ return null;
+ }
+ }
+
public String getResourceUri() {
return resourceUri;
}
@@ -257,4 +286,17 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
public void setAggregateOnException(Boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
+
+ public ExpressionDefinition getExpression() {
+ return expression;
+ }
+
+ /**
+ * Sets an expression to use for dynamic computing the endpoint to poll from.
+ * <p/>
+ * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
+ */
+ public void setExpression(ExpressionDefinition expression) {
+ this.expression = expression;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index c84ea47..b525c0f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3453,6 +3453,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
}
/**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obtain the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @param expression to use an expression to dynamically compute the endpoint to poll from
+ * @param timeout timeout in millis to wait at most for data to be available.
+ * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+ * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+ * an exception was thrown.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(Expression expression, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
+ PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+ pollEnrich.setExpression(new ExpressionDefinition(expression));
+ pollEnrich.setTimeout(timeout);
+ pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
+ pollEnrich.setAggregateOnException(aggregateOnException);
+ addOutput(pollEnrich);
+ return (Type) this;
+ }
+
+ /**
* Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
* a callback when the {@link org.apache.camel.Exchange} has finished being processed.
* The hook invoke callbacks for either onComplete or onFailure.
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9cbca74..ab313fb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -18,11 +18,15 @@ package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.ConsumerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ServiceSupport;
@@ -46,12 +50,15 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
*
* @see Enricher
*/
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
+ private CamelContext camelContext;
+ private ConsumerCache consumerCache;
private String id;
private AggregationStrategy aggregationStrategy;
- private PollingConsumer consumer;
+ private final PollingConsumer consumer;
+ private final Expression expression;
private long timeout;
private boolean aggregateOnException;
@@ -77,9 +84,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
this.aggregationStrategy = aggregationStrategy;
this.consumer = consumer;
+ this.expression = null;
this.timeout = timeout;
}
+ /**
+ * Creates a new {@link PollEnricher}.
+ *
+ * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
+ * @param expression expression to use to compute the endpoint to poll from.
+ * @param timeout timeout in millis
+ */
+ public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
+ this.aggregationStrategy = aggregationStrategy;
+ this.expression = expression;
+ this.consumer = null;
+ this.timeout = timeout;
+ }
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public String getId() {
return id;
}
@@ -89,7 +119,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
}
public Endpoint getEndpoint() {
- return consumer.getEndpoint();
+ return consumer != null ? consumer.getEndpoint() : null;
}
public AggregationStrategy getAggregationStrategy() {
@@ -162,17 +192,35 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
return true;
}
+ // which consumer to use
+ PollingConsumer target = consumer;
+ Endpoint endpoint = null;
+
+ // use dynamic endpoint so calculate the endpoint to use
+ if (expression != null) {
+ try {
+ Object recipient = expression.evaluate(exchange, Object.class);
+ endpoint = resolveEndpoint(exchange, recipient);
+ // acquire the consumer from the cache
+ target = consumerCache.acquirePollingConsumer(endpoint);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+
Exchange resourceExchange;
try {
if (timeout < 0) {
- LOG.debug("Consumer receive: {}", consumer);
+ LOG.debug("Consumer receive: {}", target);
resourceExchange = consumer.receive();
} else if (timeout == 0) {
- LOG.debug("Consumer receiveNoWait: {}", consumer);
- resourceExchange = consumer.receiveNoWait();
+ LOG.debug("Consumer receiveNoWait: {}", target);
+ resourceExchange = target.receiveNoWait();
} else {
- LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
- resourceExchange = consumer.receive(timeout);
+ LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
+ resourceExchange = target.receive(timeout);
}
if (resourceExchange == null) {
@@ -184,6 +232,11 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
callback.done(true);
return true;
+ } finally {
+ // return the consumer back to the cache
+ if (expression != null) {
+ consumerCache.releasePollingConsumer(endpoint, target);
+ }
}
try {
@@ -209,9 +262,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
- exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+ exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
} else {
- exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+ exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
}
} catch (Throwable e) {
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -223,6 +276,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
return true;
}
+ protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof String) {
+ recipient = ((String)recipient).trim();
+ }
+ return ExchangeHelper.resolveEndpoint(exchange, recipient);
+ }
+
/**
* Strategy to pre check polling.
* <p/>
@@ -251,11 +312,19 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
}
protected void doStart() throws Exception {
- ServiceHelper.startServices(aggregationStrategy, consumer);
+ if (expression != null && consumerCache == null) {
+ // create consumer cache if we use dynamic expressions for computing the endpoints to poll
+ consumerCache = new ConsumerCache(this, getCamelContext());
+ }
+ ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumer, aggregationStrategy);
+ ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+ }
+
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index 884f674..3ab90d6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -118,7 +118,6 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra
return true;
}
-
// we should preserve existing MEP so remember old MEP
// if you want to permanently to change the MEP then use .setExchangePattern in the DSL
final ExchangePattern existingPattern = exchange.getPattern();
http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
new file mode 100644
index 0000000..4e983a7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionTest extends ContextTestSupport {
+
+ public void testPollEnricExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+
+ template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+ template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .pollEnrich(header("source"), 1000, null, false)
+ .to("mock:result");
+ }
+ };
+ }
+}
[03/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic
uris.
Posted by da...@apache.org.
CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b4af69c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b4af69c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b4af69c
Branch: refs/heads/master
Commit: 1b4af69c1bb2b03057feffc4ddd4e84baa2fba0b
Parents: b5be4d6
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 10:42:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:42:20 2015 +0200
----------------------------------------------------------------------
.../apache/camel/impl/DefaultCamelContext.java | 1 +
.../apache/camel/impl/EmptyConsumerCache.java | 76 ++++++++++++++++++++
.../camel/model/PollEnrichDefinition.java | 24 +++++++
.../apache/camel/processor/PollEnricher.java | 21 +++++-
.../processor/RecipientListNoCacheTest.java | 2 +-
.../PollEnrichExpressionNoCacheTest.java | 47 ++++++++++++
.../enricher/PollEnrichExpressionTest.java | 6 +-
.../camel/spring/processor/pollEnricherRef.xml | 8 +--
8 files changed, 177 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7056932..c24674e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -2720,6 +2720,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
// special for executorServiceManager as want to stop it manually
doAddService(executorServiceManager, false);
addService(producerServicePool);
+ addService(pollingConsumerServicePool);
addService(inflightRepository);
addService(asyncProcessorAwaitManager);
addService(shutdownStrategy);
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
new file mode 100644
index 0000000..219371a
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.IsSingleton;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link ConsumerCache} which is always empty and does not cache any {@link org.apache.camel.Consumer}s.
+ */
+public class EmptyConsumerCache extends ConsumerCache {
+
+ public EmptyConsumerCache(Object source, CamelContext camelContext) {
+ super(source, camelContext, 0);
+ }
+
+ @Override
+ public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
+ // always create a new consumer
+ PollingConsumer answer;
+ try {
+ answer = endpoint.createPollingConsumer();
+ boolean singleton = true;
+ if (answer instanceof IsSingleton) {
+ singleton = ((IsSingleton) answer).isSingleton();
+ }
+ if (getCamelContext().isStartingRoutes() && singleton) {
+ // if we are currently starting a route, then add as service and enlist in JMX
+ // - but do not enlist non-singletons in JMX
+ // - note addService will also start the service
+ getCamelContext().addService(answer);
+ } else {
+ // must then start service so producer is ready to be used
+ ServiceHelper.startService(answer);
+ }
+ } catch (Exception e) {
+ throw new FailedToCreateConsumerException(endpoint, e);
+ }
+ return answer;
+ }
+
+ @Override
+ public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) {
+ // stop and shutdown the consumer as its not cache or reused
+ try {
+ ServiceHelper.stopAndShutdownService(pollingConsumer);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EmptyConsumerCache for source: " + getSource();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index eb1247b..18801d4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -52,6 +52,8 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
private Boolean aggregateOnException;
@XmlTransient
private AggregationStrategy aggregationStrategy;
+ @XmlAttribute
+ private Integer cacheSize;
public PollEnrichDefinition() {
}
@@ -89,6 +91,9 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
if (getAggregateOnException() != null) {
enricher.setAggregateOnException(getAggregateOnException());
}
+ if (getCacheSize() != null) {
+ enricher.setCacheSize(getCacheSize());
+ }
return enricher;
}
@@ -186,6 +191,18 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
return this;
}
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
+ * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public PollEnrichDefinition cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
// Properties
// -------------------------------------------------------------------------
@@ -237,4 +254,11 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
this.aggregateOnException = aggregateOnException;
}
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9873cbb..de3c7b4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.EmptyConsumerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ServiceSupport;
@@ -59,6 +60,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
private final Expression expression;
private long timeout;
private boolean aggregateOnException;
+ private int cacheSize;
/**
* Creates a new {@link PollEnricher}.
@@ -131,6 +133,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
this.aggregationStrategy = defaultAggregationStrategy();
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
public void process(Exchange exchange) throws Exception {
AsyncProcessorHelper.process(this, exchange);
}
@@ -275,7 +285,16 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw
protected void doStart() throws Exception {
if (consumerCache == null) {
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
- consumerCache = new ConsumerCache(this, getCamelContext());
+ if (cacheSize < 0) {
+ consumerCache = new EmptyConsumerCache(this, camelContext);
+ LOG.debug("PollEnrich {} is not using ConsumerCache", this);
+ } else if (cacheSize == 0) {
+ consumerCache = new ConsumerCache(this, camelContext);
+ LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this);
+ } else {
+ consumerCache = new ConsumerCache(this, camelContext, cacheSize);
+ LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startServices(consumerCache, aggregationStrategy);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 122e72b..dd4b699 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -49,7 +49,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
return new RouteBuilder() {
public void configure() {
from("direct:a").recipientList(
- header("recipientListHeader").tokenize(",")).cacheSize(0);
+ header("recipientListHeader").tokenize(",")).cacheSize(-1);
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..5c125d6
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionNoCacheTest extends ContextTestSupport {
+
+ public void testPollEnricExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+
+ template.sendBody("seda:foo", "Hello World");
+ template.sendBody("seda:bar", "Bye World");
+
+ template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+ template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .pollEnrich().header("source").cacheSize(-1)
+ .to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 38a42ab..c14de38 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -21,14 +21,16 @@ import org.apache.camel.builder.RouteBuilder;
public class PollEnrichExpressionTest extends ContextTestSupport {
- public void testPollEnricExpression() throws Exception {
- getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
+ public void testPollEnrichExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hi World");
template.sendBody("seda:foo", "Hello World");
template.sendBody("seda:bar", "Bye World");
+ template.sendBody("seda:foo", "Hi World");
template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+ template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
assertMockEndpointsSatisfied();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index c0e0c0b..3703be4 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -33,7 +33,7 @@
<route>
<from uri="direct:enricher-test-1"/>
<pollEnrich strategyRef="sampleAggregator">
- <ref>foo1</ref>
+ <simple>ref:foo1</simple>
</pollEnrich>
<to uri="mock:mock"/>
</route>
@@ -42,7 +42,7 @@
<route>
<from uri="direct:enricher-test-2"/>
<pollEnrich timeout="1000" strategyRef="sampleAggregator">
- <ref>foo2</ref>
+ <simple>ref:foo2</simple>
</pollEnrich>
<to uri="mock:mock"/>
</route>
@@ -50,7 +50,7 @@
<route>
<from uri="direct:enricher-test-3"/>
<pollEnrich timeout="-1" strategyRef="sampleAggregator">
- <ref>foo3</ref>
+ <simple>ref:foo3</simple>
</pollEnrich>
<to uri="mock:mock"/>
</route>
@@ -58,7 +58,7 @@
<route>
<from uri="direct:enricher-test-4"/>
<pollEnrich strategyRef="sampleAggregator">
- <ref>foo4</ref>
+ <simple>ref:foo4</simple>
</pollEnrich>
<to uri="mock:mock"/>
</route>
[05/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d918910
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d918910
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d918910
Branch: refs/heads/master
Commit: 2d9189100481e7ad305c600f014a3bf0d467bc3a
Parents: 5fdf8bc
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 11:35:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 11:35:42 2015 +0200
----------------------------------------------------------------------
.../processor/SpringEnrichExpressionTest.java | 30 ++++++++++++
.../SpringEnricherAggregateOnExceptionTest.xml | 8 +++-
.../spring/processor/enrichExpressionTest.xml | 50 ++++++++++++++++++++
.../apache/camel/spring/processor/enricher.xml | 4 +-
.../camel/spring/processor/enricherref.xml | 4 +-
5 files changed, 92 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
new file mode 100644
index 0000000..cd32b67
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.EnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringEnrichExpressionTest extends EnrichExpressionTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/enrichExpressionTest.xml");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
index 8084d3a..3be128e 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringEnricherAggregateOnExceptionTest.xml
@@ -25,12 +25,16 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
- <enrich uri="direct:foo" strategyRef="myAggregator" aggregateOnException="true"/>
+ <enrich strategyRef="myAggregator" aggregateOnException="true">
+ <constant>direct:foo</constant>
+ </enrich>
<to uri="mock:result"/>
</route>
<route>
<from uri="direct:start2"/>
- <enrich uri="direct:foo" strategyRef="myAggregator" aggregateOnException="false"/>
+ <enrich strategyRef="myAggregator" aggregateOnException="false">
+ <constant>direct:foo</constant>
+ </enrich>
<to uri="mock:result"/>
</route>
<route>
http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
new file mode 100644
index 0000000..91b7f42
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enrichExpressionTest.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:start"/>
+ <enrich>
+ <header>source</header>
+ </enrich>
+ <to uri="mock:result"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+
+ <route>
+ <from uri="direct:foo"/>
+ <transform>
+ <constant>Hello World</constant>
+ </transform>
+ </route>
+ <route>
+ <from uri="direct:bar"/>
+ <transform>
+ <constant>Bye World</constant>
+ </transform>
+ </route>
+ </camelContext>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
index 6d03630..918f926 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
@@ -26,7 +26,9 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
- <enrich uri="direct:resource" strategyRef="sampleAggregator"/>
+ <enrich strategyRef="sampleAggregator">
+ <constant>direct:resource</constant>
+ </enrich>
<to uri="mock:result"/>
</route>
<route>
http://git-wip-us.apache.org/repos/asf/camel/blob/2d918910/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
index 049d289..2825339 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricherref.xml
@@ -30,7 +30,9 @@
<route>
<from uri="direct:start"/>
- <enrich ref="cool" strategyRef="sampleAggregator"/>
+ <enrich strategyRef="sampleAggregator">
+ <simple>ref:cool</simple>
+ </enrich>
<to uri="mock:result"/>
</route>
<route>
[04/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5fdf8bc5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5fdf8bc5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5fdf8bc5
Branch: refs/heads/master
Commit: 5fdf8bc5179326fec39f21508f6727db6f89e515
Parents: 1b4af69
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 11:09:30 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 11:09:30 2015 +0200
----------------------------------------------------------------------
.../apache/camel/model/EnrichDefinition.java | 169 +++++++++----------
.../camel/model/PollEnrichDefinition.java | 2 -
.../apache/camel/model/ProcessorDefinition.java | 93 +++++-----
.../org/apache/camel/processor/Enricher.java | 139 +++++++++------
.../enricher/EnrichExpressionNoCacheTest.java | 49 ++++++
.../enricher/EnrichExpressionTest.java | 49 ++++++
6 files changed, 317 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 6f0e358..902524e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -23,14 +23,13 @@ import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.processor.Enricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
/**
* Enriches a message with data from a secondary resource
@@ -40,13 +39,7 @@ import org.apache.camel.util.ObjectHelper;
@Metadata(label = "eip,transformation")
@XmlRootElement(name = "enrich")
@XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> implements EndpointRequiredDefinition {
- @XmlAttribute(name = "uri")
- private String resourceUri;
- // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
- @XmlAttribute(name = "ref")
- @Deprecated
- private String resourceRef;
+public class EnrichDefinition extends NoOutputExpressionNode {
@XmlAttribute(name = "strategyRef")
private String aggregationStrategyRef;
@XmlAttribute(name = "strategyMethodName")
@@ -59,67 +52,43 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
private AggregationStrategy aggregationStrategy;
@XmlAttribute
private Boolean shareUnitOfWork;
+ @XmlAttribute
+ private Integer cacheSize;
public EnrichDefinition() {
- this(null, null);
+ this(null);
}
- public EnrichDefinition(String resourceUri) {
- this(null, resourceUri);
- }
-
- public EnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri) {
+ public EnrichDefinition(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
- this.resourceUri = resourceUri;
}
@Override
public String toString() {
- return "Enrich[" + description() + " " + aggregationStrategy + "]";
- }
-
- protected String description() {
- return FromDefinition.description(resourceUri, resourceRef, (Endpoint) null);
+ return "Enrich[" + getExpression() + "]";
}
@Override
public String getLabel() {
- return "enrich[" + description() + "]";
- }
-
- @Override
- public String getEndpointUri() {
- if (resourceUri != null) {
- return resourceUri;
- } else {
- return null;
- }
+ return "enrich[" + getExpression() + "]";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) {
- throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint");
- }
// lookup endpoint
- Endpoint endpoint;
- if (resourceUri != null) {
- endpoint = routeContext.resolveEndpoint(resourceUri);
- } else {
- endpoint = routeContext.resolveEndpoint(null, resourceRef);
- }
+
+ Expression exp = getExpression().createExpression(routeContext);
boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
- Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork);
+ Enricher enricher = new Enricher(exp);
+ enricher.setShareUnitOfWork(isShareUnitOfWork);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
- if (strategy == null) {
- enricher.setDefaultAggregationStrategy();
- } else {
+ if (strategy != null) {
enricher.setAggregationStrategy(strategy);
}
- if (getAggregateOnException() != null) {
- enricher.setAggregateOnException(getAggregateOnException());
+ if (aggregateOnException != null) {
+ enricher.setAggregateOnException(aggregateOnException);
}
return enricher;
}
@@ -149,39 +118,85 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return strategy;
}
- public String getResourceUri() {
- return resourceUri;
+ // Fluent API
+ // -------------------------------------------------------------------------
+
+ /**
+ * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public EnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy);
+ return this;
+ }
+
+ /**
+ * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public EnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+ setAggregationStrategyRef(aggregationStrategyRef);
+ return this;
}
/**
- * The endpoint uri for the external service to enrich from. You must use either uri or ref.
+ * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
*/
- public void setResourceUri(String resourceUri) {
- this.resourceUri = resourceUri;
+ public EnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+ setAggregationStrategyMethodName(aggregationStrategyMethodName);
+ return this;
}
- public String getResourceRef() {
- return resourceRef;
+ /**
+ * If this option is false then the aggregate method is not used if there was no data to enrich.
+ * If this option is true then null values is used as the oldExchange (when no data to enrich),
+ * when using POJOs as the AggregationStrategy.
+ */
+ public EnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+ setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+ return this;
}
/**
- * Refers to the endpoint for the external service to enrich from. You must use either uri or ref.
+ * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+ * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+ * to do if there was an exception in the aggregate method. For example to suppress the exception
+ * or set a custom message body etc.
+ */
+ public EnrichDefinition aggregateOnException(boolean aggregateOnException) {
+ setAggregateOnException(aggregateOnException);
+ return this;
+ }
+
+ /**
+ * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
+ * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
+ * This means the resource exchange has its own individual unit of work.
+ */
+ public EnrichDefinition shareUnitOfWork() {
+ setShareUnitOfWork(true);
+ return this;
+ }
+
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
+ * to cache and reuse consumers when using this pollEnrich, when uris are reused.
*
- * @deprecated use uri with ref:uri instead
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
*/
- @Deprecated
- public void setResourceRef(String resourceRef) {
- this.resourceRef = resourceRef;
+ public EnrichDefinition cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
}
+ // Properties
+ // -------------------------------------------------------------------------
+
public String getAggregationStrategyRef() {
return aggregationStrategyRef;
}
- /**
- * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategyRef(String aggregationStrategyRef) {
this.aggregationStrategyRef = aggregationStrategyRef;
}
@@ -190,9 +205,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return aggregationStrategyMethodName;
}
- /**
- * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
}
@@ -201,11 +213,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return aggregationStrategyMethodAllowNull;
}
- /**
- * If this option is false then the aggregate method is not used if there was no data to enrich.
- * If this option is true then null values is used as the oldExchange (when no data to enrich),
- * when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
}
@@ -214,10 +221,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return aggregationStrategy;
}
- /**
- * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
@@ -226,12 +229,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return aggregateOnException;
}
- /**
- * If this option is false then the aggregate method is not used if there was an exception thrown while trying
- * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
- * to do if there was an exception in the aggregate method. For example to suppress the exception
- * or set a custom message body etc.
- */
public void setAggregateOnException(Boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
@@ -240,13 +237,15 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
return shareUnitOfWork;
}
- /**
- * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
- * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
- * This means the resource exchange has its own individual unit of work.
- */
public void setShareUnitOfWork(Boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 18801d4..e7c84a6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -126,8 +126,6 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
// Fluent API
// -------------------------------------------------------------------------
- // TODO: add cacheSize option
-
/**
* Timeout in millis when polling from the external service.
* <p/>
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 589df86..920d85a 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3136,16 +3136,29 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
/**
* The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
* enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+ * <p/>
+ * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+ * to obatin the additional data, where as pollEnrich uses a polling consumer.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.Enricher
+ */
+ public Type enrich(String resourceUri) {
+ return enrich(resourceUri, null);
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
*
* @param resourceUri URI of resource endpoint for obtaining additional data.
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
- @SuppressWarnings("unchecked")
public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) {
- addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
- return (Type) this;
+ return enrich(resourceUri, aggregationStrategy, false);
}
/**
@@ -3159,12 +3172,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
- @SuppressWarnings("unchecked")
public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
- EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
- enrich.setAggregateOnException(aggregateOnException);
- addOutput(enrich);
- return (Type) this;
+ return enrich(resourceUri, aggregationStrategy, false, false);
}
/**
@@ -3181,10 +3190,12 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) {
- EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
- enrich.setAggregateOnException(aggregateOnException);
- enrich.setShareUnitOfWork(shareUnitOfWork);
- addOutput(enrich);
+ EnrichDefinition answer = new EnrichDefinition();
+ answer.setExpression(new ConstantExpression(resourceUri));
+ answer.setAggregationStrategy(aggregationStrategy);
+ answer.setAggregateOnException(aggregateOnException);
+ answer.setShareUnitOfWork(shareUnitOfWork);
+ addOutput(answer);
return (Type) this;
}
@@ -3193,16 +3204,15 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* enriches an exchange with additional data obtained from a <code>resourceUri</code>.
* <p/>
* The difference between this and {@link #pollEnrich(String)} is that this uses a producer
- * to obatin the additional data, where as pollEnrich uses a polling consumer.
+ * to obtain the additional data, where as pollEnrich uses a polling consumer.
*
- * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @param resourceRef Reference of resource endpoint for obtaining additional data.
+ * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
- @SuppressWarnings("unchecked")
- public Type enrich(String resourceUri) {
- addOutput(new EnrichDefinition(resourceUri));
- return (Type) this;
+ public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
+ return enrichRef(resourceRef, aggregationStrategyRef, false);
}
/**
@@ -3214,16 +3224,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*
* @param resourceRef Reference of resource endpoint for obtaining additional data.
* @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+ * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+ * an exception was thrown.
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
- @SuppressWarnings("unchecked")
- public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
- EnrichDefinition enrich = new EnrichDefinition();
- enrich.setResourceRef(resourceRef);
- enrich.setAggregationStrategyRef(aggregationStrategyRef);
- addOutput(enrich);
- return (Type) this;
+ public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) {
+ return enrichRef(resourceRef, aggregationStrategyRef, false, false);
}
/**
@@ -3237,16 +3244,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
* @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
* an exception was thrown.
+ * @param shareUnitOfWork whether to share unit of work
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
@SuppressWarnings("unchecked")
- public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) {
- EnrichDefinition enrich = new EnrichDefinition();
- enrich.setResourceRef(resourceRef);
- enrich.setAggregationStrategyRef(aggregationStrategyRef);
- enrich.setAggregateOnException(aggregateOnException);
- addOutput(enrich);
+ public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
+ EnrichDefinition answer = new EnrichDefinition();
+ answer.setExpression(new SimpleExpression("ref:" + resourceRef));
+ answer.setAggregationStrategyRef(aggregationStrategyRef);
+ answer.setAggregateOnException(aggregateOnException);
+ answer.setShareUnitOfWork(shareUnitOfWork);
+ addOutput(answer);
return (Type) this;
}
@@ -3257,23 +3266,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* The difference between this and {@link #pollEnrich(String)} is that this uses a producer
* to obtain the additional data, where as pollEnrich uses a polling consumer.
*
- * @param resourceRef Reference of resource endpoint for obtaining additional data.
- * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
- * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
- * an exception was thrown.
- * @param shareUnitOfWork whether to share unit of work
- * @return the builder
- * @see org.apache.camel.processor.Enricher
+ * @return a expression builder clause to set the expression to use for computing the endpoint to use
+ * @see org.apache.camel.processor.PollEnricher
*/
- @SuppressWarnings("unchecked")
- public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
- EnrichDefinition enrich = new EnrichDefinition();
- enrich.setResourceRef(resourceRef);
- enrich.setAggregationStrategyRef(aggregationStrategyRef);
- enrich.setAggregateOnException(aggregateOnException);
- enrich.setShareUnitOfWork(shareUnitOfWork);
- addOutput(enrich);
- return (Type) this;
+ public ExpressionClause<EnrichDefinition> enrich() {
+ EnrichDefinition answer = new EnrichDefinition();
+ addOutput(answer);
+ return ExpressionClause.createAndSetExpression(answer);
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index b0532ba..cbdf104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -18,18 +18,19 @@ package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
+import org.apache.camel.Expression;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
+import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -39,6 +40,7 @@ import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
@@ -53,38 +55,28 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
*
* @see PollEnricher
*/
-public class Enricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware {
+public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
+ private CamelContext camelContext;
private String id;
+ private ProducerCache producerCache;
+ private final Expression expression;
private AggregationStrategy aggregationStrategy;
- private Producer producer;
private boolean aggregateOnException;
private boolean shareUnitOfWork;
+ private int cacheSize;
- /**
- * Creates a new {@link Enricher}. The default aggregation strategy is to
- * copy the additional data obtained from the enricher's resource over the
- * input data. When using the copy aggregation strategy the enricher
- * degenerates to a normal transformer.
- *
- * @param producer producer to resource endpoint.
- */
- public Enricher(Producer producer) {
- this(defaultAggregationStrategy(), producer, false);
+ public Enricher(Expression expression) {
+ this.expression = expression;
}
- /**
- * Creates a new {@link Enricher}.
- *
- * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
- * @param producer producer to resource endpoint.
- * @param shareUnitOfWork whether to share unit of work
- */
- public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) {
- this.aggregationStrategy = aggregationStrategy;
- this.producer = producer;
- this.shareUnitOfWork = shareUnitOfWork;
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
}
public String getId() {
@@ -95,11 +87,6 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
this.id = id;
}
- /**
- * Sets the aggregation strategy for this enricher.
- *
- * @param aggregationStrategy the aggregationStrategy to set
- */
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
@@ -112,23 +99,24 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
return aggregateOnException;
}
- /**
- * Whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
- * an exception was thrown.
- */
public void setAggregateOnException(boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
- /**
- * Sets the default aggregation strategy for this enricher.
- */
- public void setDefaultAggregationStrategy() {
- this.aggregationStrategy = defaultAggregationStrategy();
+ public boolean isShareUnitOfWork() {
+ return shareUnitOfWork;
+ }
+
+ public void setShareUnitOfWork(boolean shareUnitOfWork) {
+ this.shareUnitOfWork = shareUnitOfWork;
}
- public Endpoint getEndpoint() {
- return producer.getEndpoint();
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
}
public void process(Exchange exchange) throws Exception {
@@ -148,6 +136,22 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
* @param exchange input data.
*/
public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ // which producer to use
+ final Producer producer;
+ final Endpoint endpoint;
+
+ // use dynamic endpoint so calculate the endpoint to use
+ try {
+ Object recipient = expression.evaluate(exchange, Object.class);
+ endpoint = resolveEndpoint(exchange, recipient);
+ // acquire the consumer from the cache
+ producer = producerCache.acquireProducer(endpoint);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
final Endpoint destination = producer.getEndpoint();
@@ -192,6 +196,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
// set property with the uri of the endpoint enriched so we can use that for tracing etc
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
+ // return the producer back to the cache
+ try {
+ producerCache.releaseProducer(endpoint, producer);
+ } catch (Exception e) {
+ // ignore
+ }
+
callback.done(false);
}
});
@@ -236,10 +247,25 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
// set property with the uri of the endpoint enriched so we can use that for tracing etc
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
+ // return the producer back to the cache
+ try {
+ producerCache.releaseProducer(endpoint, producer);
+ } catch (Exception e) {
+ // ignore
+ }
+
callback.done(true);
return true;
}
+ protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+ // trim strings as end users might have added spaces between separators
+ if (recipient instanceof String) {
+ recipient = ((String)recipient).trim();
+ }
+ return ExchangeHelper.resolveEndpoint(exchange, recipient);
+ }
+
/**
* Creates a new {@link DefaultExchange} instance from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
@@ -273,21 +299,34 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
return new CopyAggregationStrategy();
}
- public boolean isShareUnitOfWork() {
- return shareUnitOfWork;
- }
-
@Override
public String toString() {
- return "Enrich[" + producer.getEndpoint() + "]";
+ return "Enrich[" + expression + "]";
}
protected void doStart() throws Exception {
- ServiceHelper.startServices(aggregationStrategy, producer);
+ if (aggregationStrategy == null) {
+ aggregationStrategy = defaultAggregationStrategy();
+ }
+
+ if (producerCache == null) {
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ LOG.debug("Enricher {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ LOG.debug("Enricher {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
+ }
+
+ ServiceHelper.startServices(producerCache, aggregationStrategy);
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(producer, aggregationStrategy);
+ ServiceHelper.stopServices(aggregationStrategy, producerCache);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..12d4bd7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionNoCacheTest extends ContextTestSupport {
+
+ public void testEnrichExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World");
+
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:bar");
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .enrich().header("source").cacheSize(-1)
+ .to("mock:result");
+
+ from("direct:foo").transform().constant("Hello World");
+
+ from("direct:bar").transform().constant("Bye World");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
new file mode 100644
index 0000000..0248884
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionTest extends ContextTestSupport {
+
+ public void testEnrichExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World");
+
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:bar");
+ template.sendBodyAndHeader("direct:start", null, "source", "direct:foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .enrich().header("source")
+ .to("mock:result");
+
+ from("direct:foo").transform().constant("Hello World");
+
+ from("direct:bar").transform().constant("Bye World");
+ }
+ };
+ }
+}
[02/10] camel git commit: CAMEL-4596: pollEnrich supports dynamic
uris.
Posted by da...@apache.org.
CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5be4d6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5be4d6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5be4d6e
Branch: refs/heads/master
Commit: b5be4d6ed829700ca9af852940dcf28f3c00b598
Parents: 9fd4d54
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 09:34:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 10:06:13 2015 +0200
----------------------------------------------------------------------
.../camel/model/PollEnrichDefinition.java | 184 ++++++-------------
.../apache/camel/model/ProcessorDefinition.java | 43 +++--
.../apache/camel/processor/PollEnricher.java | 91 +++------
.../enricher/PollEnrichExpressionTest.java | 2 +-
.../SpringPollEnrichExpressionTest.java | 30 +++
.../processor/pollEnrichExpressionTest.xml | 37 ++++
.../camel/spring/processor/pollEnricher.xml | 66 ++++---
.../camel/spring/processor/pollEnricherRef.xml | 78 ++++----
8 files changed, 267 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index a5d7cb1..eb1247b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -19,22 +19,17 @@ package org.apache.camel.model;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
/**
* Enriches messages with data polled from a secondary resource
@@ -44,15 +39,7 @@ import org.apache.camel.util.ObjectHelper;
@Metadata(label = "eip,transformation")
@XmlRootElement(name = "pollEnrich")
@XmlAccessorType(XmlAccessType.FIELD)
-public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition {
- @XmlElementRef
- private ExpressionDefinition expression;
- @XmlAttribute(name = "uri")
- private String resourceUri;
- // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref:
- @XmlAttribute(name = "ref")
- @Deprecated
- private String resourceRef;
+public class PollEnrichDefinition extends NoOutputExpressionNode {
@XmlAttribute @Metadata(defaultValue = "-1")
private Long timeout;
@XmlAttribute(name = "strategyRef")
@@ -69,65 +56,29 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
public PollEnrichDefinition() {
}
- public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) {
+ public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
this.aggregationStrategy = aggregationStrategy;
- this.resourceUri = resourceUri;
this.timeout = timeout;
}
@Override
public String toString() {
- return "PollEnrich[" + description() + " " + aggregationStrategy + "]";
+ return "PollEnrich[" + getExpression() + "]";
}
- protected String description() {
- return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null);
- }
-
@Override
public String getLabel() {
- return "pollEnrich[" + description() + "]";
- }
-
- @Override
- public String getEndpointUri() {
- if (resourceUri != null) {
- return resourceUri;
- } else {
- return null;
- }
+ return "pollEnrich[" + getExpression() + "]";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) {
- throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
- }
-
- // lookup endpoint
- PollingConsumer consumer = null;
- if (resourceUri != null) {
- Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
- consumer = endpoint.createPollingConsumer();
- } else if (resourceRef != null) {
- Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef);
- consumer = endpoint.createPollingConsumer();
- }
// if no timeout then we should block, and there use a negative timeout
long time = timeout != null ? timeout : -1;
+ Expression exp = getExpression().createExpression(routeContext);
- // create the expression if any was configured
- Expression exp = createResourceExpression(routeContext);
-
- PollEnricher enricher;
- if (exp != null) {
- enricher = new PollEnricher(null, exp, time);
- } else if (consumer != null) {
- enricher = new PollEnricher(null, consumer, time);
- } else {
- throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured");
- }
+ PollEnricher enricher = new PollEnricher(exp, time);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
if (strategy == null) {
@@ -167,60 +118,81 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return strategy;
}
+ // Fluent API
+ // -------------------------------------------------------------------------
+
+ // TODO: add cacheSize option
+
/**
- * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from.
- *
- * @param routeContext the route context
- * @return the created expression, or <tt>null</tt> if no expression configured
+ * Timeout in millis when polling from the external service.
+ * <p/>
+ * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
+ * <ul>
+ * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
+ * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
+ * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
+ * </ul>
+ * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
*/
- protected Expression createResourceExpression(RouteContext routeContext) {
- if (expression != null) {
- return expression.createExpression(routeContext);
- } else {
- return null;
- }
+ public PollEnrichDefinition timeout(long timeout) {
+ setTimeout(timeout);
+ return this;
}
- public String getResourceUri() {
- return resourceUri;
+ /**
+ * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
+ */
+ public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy);
+ return this;
}
/**
- * The endpoint uri for the external service to poll enrich from. You must use either uri or ref.
+ * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
+ * By default Camel will use the reply from the external service as outgoing message.
*/
- public void setResourceUri(String resourceUri) {
- this.resourceUri = resourceUri;
+ public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+ setAggregationStrategyRef(aggregationStrategyRef);
+ return this;
}
- public String getResourceRef() {
- return resourceRef;
+ /**
+ * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
+ */
+ public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
+ setAggregationStrategyMethodName(aggregationStrategyMethodName);
+ return this;
+ }
+
+ /**
+ * If this option is false then the aggregate method is not used if there was no data to enrich.
+ * If this option is true then null values is used as the oldExchange (when no data to enrich),
+ * when using POJOs as the AggregationStrategy.
+ */
+ public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
+ setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+ return this;
}
/**
- * Refers to the endpoint for the external service to poll enrich from. You must use either uri or ref.
- *
- * @deprecated use uri with ref:uri instead
+ * If this option is false then the aggregate method is not used if there was an exception thrown while trying
+ * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
+ * to do if there was an exception in the aggregate method. For example to suppress the exception
+ * or set a custom message body etc.
*/
- @Deprecated
- public void setResourceRef(String resourceRef) {
- this.resourceRef = resourceRef;
+ public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
+ setAggregateOnException(aggregateOnException);
+ return this;
}
+ // Properties
+ // -------------------------------------------------------------------------
+
public Long getTimeout() {
return timeout;
}
- /**
- * Timeout in millis when polling from the external service.
- * <p/>
- * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
- * <ul>
- * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
- * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
- * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
- * </ul>
- * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
- */
public void setTimeout(Long timeout) {
this.timeout = timeout;
}
@@ -229,10 +201,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyRef;
}
- /**
- * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategyRef(String aggregationStrategyRef) {
this.aggregationStrategyRef = aggregationStrategyRef;
}
@@ -241,9 +209,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyMethodName;
}
- /**
- * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
this.aggregationStrategyMethodName = aggregationStrategyMethodName;
}
@@ -252,11 +217,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategyMethodAllowNull;
}
- /**
- * If this option is false then the aggregate method is not used if there was no data to enrich.
- * If this option is true then null values is used as the oldExchange (when no data to enrich),
- * when using POJOs as the AggregationStrategy.
- */
public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
}
@@ -265,10 +225,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregationStrategy;
}
- /**
- * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
- * By default Camel will use the reply from the external service as outgoing message.
- */
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
@@ -277,26 +233,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio
return aggregateOnException;
}
- /**
- * If this option is false then the aggregate method is not used if there was an exception thrown while trying
- * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
- * to do if there was an exception in the aggregate method. For example to suppress the exception
- * or set a custom message body etc.
- */
public void setAggregateOnException(Boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
- public ExpressionDefinition getExpression() {
- return expression;
- }
-
- /**
- * Sets an expression to use for dynamic computing the endpoint to poll from.
- * <p/>
- * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use.
- */
- public void setExpression(ExpressionDefinition expression) {
- this.expression = expression;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b525c0f..589df86 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -52,6 +52,7 @@ import org.apache.camel.builder.ProcessorBuilder;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.LanguageExpression;
+import org.apache.camel.model.language.SimpleExpression;
import org.apache.camel.model.rest.RestDefinition;
import org.apache.camel.processor.InterceptEndpointProcessor;
import org.apache.camel.processor.Pipeline;
@@ -3292,8 +3293,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri) {
- addOutput(new PollEnrichDefinition(null, resourceUri, -1));
- return (Type) this;
+ return pollEnrich(resourceUri, null);
}
/**
@@ -3314,8 +3314,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1));
- return (Type) this;
+ return pollEnrich(resourceUri, -1, aggregationStrategy);
}
/**
@@ -3338,8 +3337,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
- addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
- return (Type) this;
+ return pollEnrich(resourceUri, timeout, aggregationStrategy, false);
}
/**
@@ -3364,7 +3362,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
- PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout);
+ PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
+ pollEnrich.setExpression(new ConstantExpression(resourceUri));
+ pollEnrich.setTimeout(timeout);
+ pollEnrich.setAggregationStrategy(aggregationStrategy);
pollEnrich.setAggregateOnException(aggregateOnException);
addOutput(pollEnrich);
return (Type) this;
@@ -3389,8 +3390,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*/
@SuppressWarnings("unchecked")
public Type pollEnrich(String resourceUri, long timeout) {
- addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
- return (Type) this;
+ return pollEnrich(resourceUri, timeout, null);
}
/**
@@ -3414,7 +3414,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
@SuppressWarnings("unchecked")
public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) {
PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
- pollEnrich.setResourceRef(resourceRef);
+ pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
pollEnrich.setTimeout(timeout);
pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
addOutput(pollEnrich);
@@ -3444,7 +3444,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
@SuppressWarnings("unchecked")
public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) {
PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
- pollEnrich.setResourceRef(resourceRef);
+ pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef));
pollEnrich.setTimeout(timeout);
pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
pollEnrich.setAggregateOnException(aggregateOnException);
@@ -3484,6 +3484,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
}
/**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>
+ * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+ * <p/>
+ * The difference between this and {@link #enrich(String)} is that this uses a consumer
+ * to obtain the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @return a expression builder clause to set the expression to use for computing the endpoint to poll from
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ public ExpressionClause<PollEnrichDefinition> pollEnrich() {
+ PollEnrichDefinition answer = new PollEnrichDefinition();
+ addOutput(answer);
+ return ExpressionClause.createAndSetExpression(answer);
+ }
+
+ /**
* Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
* a callback when the {@link org.apache.camel.Exchange} has finished being processed.
* The hook invoke callbacks for either onComplete or onFailure.
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index ab313fb..9873cbb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -22,7 +22,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.PollingConsumer;
@@ -50,55 +49,25 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
*
* @see Enricher
*/
-public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware {
+public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
private CamelContext camelContext;
private ConsumerCache consumerCache;
private String id;
private AggregationStrategy aggregationStrategy;
- private final PollingConsumer consumer;
private final Expression expression;
private long timeout;
private boolean aggregateOnException;
/**
- * Creates a new {@link PollEnricher}. The default aggregation strategy is to
- * copy the additional data obtained from the enricher's resource over the
- * input data. When using the copy aggregation strategy the enricher
- * degenerates to a normal transformer.
- *
- * @param consumer consumer to resource endpoint.
- */
- public PollEnricher(PollingConsumer consumer) {
- this(defaultAggregationStrategy(), consumer, 0);
- }
-
- /**
* Creates a new {@link PollEnricher}.
*
- * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
- * @param consumer consumer to resource endpoint.
- * @param timeout timeout in millis
- */
- public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
- this.aggregationStrategy = aggregationStrategy;
- this.consumer = consumer;
- this.expression = null;
- this.timeout = timeout;
- }
-
- /**
- * Creates a new {@link PollEnricher}.
- *
- * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param expression expression to use to compute the endpoint to poll from.
* @param timeout timeout in millis
*/
- public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) {
- this.aggregationStrategy = aggregationStrategy;
+ public PollEnricher(Expression expression, long timeout) {
this.expression = expression;
- this.consumer = null;
this.timeout = timeout;
}
@@ -118,10 +87,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
this.id = id;
}
- public Endpoint getEndpoint() {
- return consumer != null ? consumer.getEndpoint() : null;
- }
-
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
@@ -193,34 +158,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
}
// which consumer to use
- PollingConsumer target = consumer;
- Endpoint endpoint = null;
+ PollingConsumer consumer;
+ Endpoint endpoint;
// use dynamic endpoint so calculate the endpoint to use
- if (expression != null) {
- try {
- Object recipient = expression.evaluate(exchange, Object.class);
- endpoint = resolveEndpoint(exchange, recipient);
- // acquire the consumer from the cache
- target = consumerCache.acquirePollingConsumer(endpoint);
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
+ try {
+ Object recipient = expression.evaluate(exchange, Object.class);
+ endpoint = resolveEndpoint(exchange, recipient);
+ // acquire the consumer from the cache
+ consumer = consumerCache.acquirePollingConsumer(endpoint);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
Exchange resourceExchange;
try {
if (timeout < 0) {
- LOG.debug("Consumer receive: {}", target);
+ LOG.debug("Consumer receive: {}", consumer);
resourceExchange = consumer.receive();
} else if (timeout == 0) {
- LOG.debug("Consumer receiveNoWait: {}", target);
- resourceExchange = target.receiveNoWait();
+ LOG.debug("Consumer receiveNoWait: {}", consumer);
+ resourceExchange = consumer.receiveNoWait();
} else {
- LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target);
- resourceExchange = target.receive(timeout);
+ LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
+ resourceExchange = consumer.receive(timeout);
}
if (resourceExchange == null) {
@@ -234,9 +197,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
return true;
} finally {
// return the consumer back to the cache
- if (expression != null) {
- consumerCache.releasePollingConsumer(endpoint, target);
- }
+ consumerCache.releasePollingConsumer(endpoint, consumer);
}
try {
@@ -262,9 +223,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
- exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+ exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
} else {
- exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri());
+ exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
}
} catch (Throwable e) {
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
@@ -308,23 +269,23 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp
@Override
public String toString() {
- return "PollEnrich[" + consumer + "]";
+ return "PollEnrich[" + expression + "]";
}
protected void doStart() throws Exception {
- if (expression != null && consumerCache == null) {
+ if (consumerCache == null) {
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
consumerCache = new ConsumerCache(this, getCamelContext());
}
- ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.startServices(consumerCache, aggregationStrategy);
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.stopServices(aggregationStrategy, consumerCache);
}
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy);
+ ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 4e983a7..38a42ab 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -39,7 +39,7 @@ public class PollEnrichExpressionTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .pollEnrich(header("source"), 1000, null, false)
+ .pollEnrich().header("source")
.to("mock:result");
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
new file mode 100644
index 0000000..8017e8d
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.enricher.PollEnrichExpressionTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringPollEnrichExpressionTest extends PollEnrichExpressionTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnrichExpressionTest.xml");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
new file mode 100644
index 0000000..cb932e3
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:start"/>
+ <pollEnrich>
+ <header>source</header>
+ </pollEnrich>
+ <to uri="mock:result"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+ </camelContext>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
index f63b63a..f0f71ce 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
@@ -22,34 +22,42 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <camelContext xmlns="http://camel.apache.org/schema/spring">
- <!-- START SNIPPET: e1 -->
- <route>
- <from uri="direct:enricher-test-1"/>
- <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- <!-- END SNIPPET: e1 -->
-
- <route>
- <from uri="direct:enricher-test-2"/>
- <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-3"/>
- <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-4"/>
- <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- </camelContext>
-
- <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <constant>seda:foo1</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+ <constant>seda:foo2</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+ <constant>seda:foo3</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <constant>seda:foo4</constant>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index 6046e7d..c0e0c0b 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -22,40 +22,48 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <camelContext xmlns="http://camel.apache.org/schema/spring">
-
- <endpoint id="foo1" uri="seda:foo1"/>
- <endpoint id="foo2" uri="seda:foo2"/>
- <endpoint id="foo3" uri="seda:foo3"/>
- <endpoint id="foo4" uri="seda:foo4"/>
-
- <!-- START SNIPPET: e1 -->
- <route>
- <from uri="direct:enricher-test-1"/>
- <pollEnrich ref="foo1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- <!-- END SNIPPET: e1 -->
-
- <route>
- <from uri="direct:enricher-test-2"/>
- <pollEnrich ref="foo2" timeout="1000" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-3"/>
- <pollEnrich ref="foo3" timeout="-1" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
-
- <route>
- <from uri="direct:enricher-test-4"/>
- <pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
- <to uri="mock:mock"/>
- </route>
- </camelContext>
-
- <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+ <endpoint id="foo1" uri="seda:foo1"/>
+ <endpoint id="foo2" uri="seda:foo2"/>
+ <endpoint id="foo3" uri="seda:foo3"/>
+ <endpoint id="foo4" uri="seda:foo4"/>
+
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <ref>foo1</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
+
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich timeout="1000" strategyRef="sampleAggregator">
+ <ref>foo2</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich timeout="-1" strategyRef="sampleAggregator">
+ <ref>foo3</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich strategyRef="sampleAggregator">
+ <ref>foo4</ref>
+ </pollEnrich>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>
[09/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe8bf8d6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe8bf8d6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe8bf8d6
Branch: refs/heads/master
Commit: fe8bf8d64c6c57775610ca0b8c7b87bb4efb601a
Parents: b07edd0
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:50:27 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:50:27 2015 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/camel/model/ProcessorDefinition.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fe8bf8d6/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 920d85a..2359774 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3173,7 +3173,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @see org.apache.camel.processor.Enricher
*/
public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) {
- return enrich(resourceUri, aggregationStrategy, false, false);
+ return enrich(resourceUri, aggregationStrategy, aggregateOnException, false);
}
/**
[06/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80dbbc40
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80dbbc40
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80dbbc40
Branch: refs/heads/master
Commit: 80dbbc4067f7afa767a62dc7daea92096c95c99a
Parents: 2d91891
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:09:56 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:09:56 2015 +0200
----------------------------------------------------------------------
.../apache/camel/component/cxf/GreeterEnrichRouterContext.xml | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/80dbbc40/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
index c6ff1c5..3773033 100644
--- a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
+++ b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/GreeterEnrichRouterContext.xml
@@ -52,7 +52,9 @@
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route errorHandlerRef="noErrorHandler">
<from ref="routerEndpoint" />
- <enrich ref="serviceEndpoint" />
+ <enrich>
+ <simple>ref:serviceEndpoint</simple>
+ </enrich>
</route>
</camelContext>
[07/10] camel git commit: CAMEL-8956: Add failsafe until we have
better namespace parsing with factory beans.
Posted by da...@apache.org.
CAMEL-8956: Add failsafe until we have better namespace parsing with factory beans.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/39c51705
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/39c51705
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/39c51705
Branch: refs/heads/master
Commit: 39c5170589d1d15ccf6ca975f7cdbd046b54db87
Parents: 80dbbc4
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:10:28 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:10:28 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/camel/component/cxf/CxfProducer.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/39c51705/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
index 0aa4fa9..f778e5f 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.cxf.common.message.CxfConstants;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.cxf.Bus;
import org.apache.cxf.binding.soap.model.SoapHeaderInfo;
import org.apache.cxf.endpoint.Client;
@@ -72,6 +73,9 @@ public class CxfProducer extends DefaultProducer implements AsyncProcessor {
@Override
protected void doStart() throws Exception {
+ // failsafe as cxf may not ensure the endpoint is started (CAMEL-8956)
+ ServiceHelper.startService(endpoint);
+
if (client == null) {
client = endpoint.createClient();
}
[10/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe5960ae
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe5960ae
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe5960ae
Branch: refs/heads/master
Commit: fe5960aef0d8040b651762b6d3423a610f87176d
Parents: fe8bf8d
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 14:19:06 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 14:19:06 2015 +0200
----------------------------------------------------------------------
.../org/apache/camel/scala/dsl/SEnrichDefinition.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fe5960ae/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
index 48a40f5..450fe84 100644
--- a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
+++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SEnrichDefinition.scala
@@ -27,7 +27,10 @@ case class SEnrichDefinition(override val target: EnrichDefinition)(implicit val
def aggregationStrategy(strategy: AggregationStrategy) = wrap(target.setAggregationStrategy(strategy))
def aggregationStrategyRef(ref: String) = wrap(target.setAggregationStrategyRef(ref))
-
- def resourceRef(ref: String) = wrap(target.setResourceRef(ref))
- def resourceUri(resourceUri: String) = wrap(target.setResourceUri(resourceUri))
+ def aggregationStrategyMethodName(name: String) = wrap(target.setAggregationStrategyMethodName(name))
+ def aggregationStrategyMethodAllowNull(allowNull: Boolean) = wrap(target.setAggregationStrategyMethodAllowNull(allowNull))
+ def aggregateOnException(aggregateOnException: Boolean) = wrap(target.setAggregateOnException(aggregateOnException))
+ def shareUnitOfWork() = wrap(target.setShareUnitOfWork(true))
+ def cacheSize(size: Integer) = wrap(target.setCacheSize(size))
+
}
[08/10] camel git commit: CAMEL-4596: enrich supports dynamic uris.
Posted by da...@apache.org.
CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b07edd0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b07edd0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b07edd0a
Branch: refs/heads/master
Commit: b07edd0a2f535ebb9d73bd31736ac838523e078b
Parents: 39c5170
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 13 12:48:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 13 12:48:20 2015 +0200
----------------------------------------------------------------------
.../impl/MultipleLifecycleStrategyTest.java | 2 +-
...roducerRouteAddRemoveRegisterAlwaysTest.java | 6 +--
.../management/ManagedRouteAddRemoveTest.java | 42 ++++++++++----------
.../model/GatherAllStaticEndpointUrisTest.java | 2 +-
4 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index a3cf751..f39a1ba 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -50,7 +50,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
context.removeComponent("log");
context.stop();
- List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
+ List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
"onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
assertEquals(expectedEvents, dummy1.getEvents());
http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index 86ec502..606be4b 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -47,7 +47,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -72,7 +72,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// but as its recipient list which is dynamic-to we add new producers because we have register always
namesP = mbeanServer.queryNames(onP, null);
@@ -87,7 +87,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// and we still have the other producers, but not the one from the 2nd route that was removed
namesP = mbeanServer.queryNames(onP, null);
http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index 46656de..ededf75 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -59,7 +59,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -84,7 +84,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// but we should have one more producer
namesP = mbeanServer.queryNames(onP, null);
@@ -99,7 +99,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// and the 2nd producer should be removed
namesP = mbeanServer.queryNames(onP, null);
@@ -119,7 +119,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -144,7 +144,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// but as its recipient list which is dynamic-to we do not add a new producer
namesP = mbeanServer.queryNames(onP, null);
@@ -159,7 +159,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// and we still have the original producer
namesP = mbeanServer.queryNames(onP, null);
@@ -179,7 +179,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -204,7 +204,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// but as its recipient list which is dynamic-to we do not add a new producer
namesP = mbeanServer.queryNames(onP, null);
@@ -219,7 +219,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// and we still have the original producer
namesP = mbeanServer.queryNames(onP, null);
@@ -239,7 +239,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Adding 2nd route");
@@ -269,7 +269,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -281,7 +281,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Shutting down...");
}
@@ -297,7 +297,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Adding 2nd route");
@@ -328,7 +328,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -340,7 +340,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Shutting down...");
}
@@ -356,7 +356,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Adding 2nd route");
@@ -385,7 +385,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -397,7 +397,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Shutting down...");
}
@@ -413,7 +413,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Adding 2nd route");
@@ -443,7 +443,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -455,7 +455,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(9, names.size());
+ assertEquals(10, names.size());
log.info("Shutting down...");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b07edd0a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
index 4713c61..f423c84 100644
--- a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
+++ b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java
@@ -27,7 +27,7 @@ public class GatherAllStaticEndpointUrisTest extends ContextTestSupport {
RouteDefinition route = context.getRouteDefinition("foo");
Set<String> uris = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route, true, true);
assertNotNull(uris);
- assertEquals(5, uris.size());
+ assertEquals(4, uris.size());
RouteDefinition route2 = context.getRouteDefinition("bar");
Set<String> uris2 = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route2, true, true);